Transactional Producers
Difficulty: Senior Level | Companies: LinkedIn, Uber, Netflix, Spotify, Confluent
Content
Transactional producers enable atomic writes across multiple partitions and topics. They are fundamental to exactly-once semantics in Kafka.
Transaction Architecture
Architecture Diagram
Transactional Producer:
βββββββββββββββββββββββββββββββββββββββββββββββββββ
β Producer β
β βββ Transaction ID (persistent) β
β βββ Producer ID (PID) - assigned by broker β
β βββ Epoch (monotonic counter) β
βββββββββββββββββββββββββββββββββββββββββββββββββββ
β
βΌ
βββββββββββββββββββββββββββββββββββββββββββββββββββ
β Transaction Coordinator β
β βββ Manages transaction state β
β βββ Writes to __transaction_state topic β
β βββ Coordinates commit/abort β
βββββββββββββββββββββββββββββββββββββββββββββββββββ
β
βΌ
βββββββββββββββββββββββββββββββββββββββββββββββββββ
β Brokers β
β βββ Receive produce requests β
β βββ Track PID and sequence numbers β
β βββ Buffer uncommitted records β
βββββββββββββββββββββββββββββββββββββββββββββββββββ
Java Transactional Producer
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class TransactionalProducerExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// Transactional configuration
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-processor-tx-1");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
// Transaction timeout
props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 60000);
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// Initialize transactions
producer.initTransactions();
try {
// Begin transaction
producer.beginTransaction();
// Produce to multiple topics atomically
producer.send(new ProducerRecord<>("orders", "order-123",
"{\"amount\": 99.99}"));
producer.send(new ProducerRecord<>("inventory", "item-456",
"{\"stock\": -1}"));
producer.send(new ProducerRecord<>("payments", "pay-789",
"{\"status\": \"pending\"}"));
// Commit consumer offsets as part of transaction
// This ensures exactly-once processing
producer.sendOffsetsToTransaction(
getConsumerOffsets(),
"order-consumer-group"
);
// Commit transaction
producer.commitTransaction();
System.out.println("Transaction committed successfully");
} catch (Exception e) {
System.err.println("Transaction failed: " + e.getMessage());
producer.abortTransaction();
} finally {
producer.close();
}
}
private static java.util.Map<org.apache.kafka.common.TopicPartition,
org.apache.kafka.clients.consumer.OffsetAndMetadata>
getConsumerOffsets() {
java.util.Map<org.apache.kafka.common.TopicPartition,
org.apache.kafka.clients.consumer.OffsetAndMetadata> offsets =
new java.util.HashMap<>();
offsets.put(
new org.apache.kafka.common.TopicPartition("orders", 0),
new org.apache.kafka.clients.consumer.OffsetAndMetadata(100)
);
return offsets;
}
}
Python Transactional Producer
from kafka import KafkaProducer
from kafka.errors import KafkaError
import json
class TransactionalProducer:
def __init__(self, bootstrap_servers, transactional_id):
self.producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
transactional_id=transactional_id,
key_serializer=lambda k: k.encode('utf-8'),
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
acks='all',
enable_idempotence=True,
retries=2147483647,
max_in_flight_requests_per_connection=5
)
self.producer.init_transactions()
def send_transaction(self, records):
"""
Send multiple records atomically
records: list of (topic, key, value) tuples
"""
try:
self.producer.begin_transaction()
for topic, key, value in records:
self.producer.send(topic, key=key, value=value)
self.producer.commit_transaction()
return True
except KafkaError as e:
print(f"Transaction failed: {e}")
self.producer.abort_transaction()
return False
def send_with_offsets(self, records, consumer_group, offsets):
"""
Send records and commit consumer offsets atomically
"""
try:
self.producer.begin_transaction()
# Send records
for topic, key, value in records:
self.producer.send(topic, key=key, value=value)
# Commit consumer offsets
self.producer.send_offsets_to_transaction(offsets, consumer_group)
self.producer.commit_transaction()
return True
except KafkaError as e:
print(f"Transaction failed: {e}")
self.producer.abort_transaction()
return False
def close(self):
self.producer.close()
# Usage
producer = TransactionalProducer(
bootstrap_servers='kafka1:9092',
transactional_id='order-processor-tx-1'
)
records = [
('orders', 'order-123', {'amount': 99.99}),
('inventory', 'item-456', {'stock': -1}),
('payments', 'pay-789', {'status': 'pending'})
]
success = producer.send_transaction(records)
print(f"Transaction {'committed' if success else 'aborted'}")
Transaction Coordinator Internals
Architecture Diagram
Transaction State Machine:
βββββββββββββββββββββββββββββββββββββββββββββββββββ
β Empty β Ongoing β PrepareCommit β CompleteCommitβ
β β β β β β
β β β βΌ β β
β β β PrepareAbort β β
β β β β β β
β β βΌ βΌ βΌ β
β ββββββ Ongoing ββββββββ΄ββββββββ CompleteAbortβ
βββββββββββββββββββββββββββββββββββββββββββββββββββ
State Transitions:
1. Empty β Ongoing: initTransactions() or beginTransaction()
2. Ongoing β PrepareCommit: commitTransaction()
3. Ongoing β PrepareAbort: abortTransaction()
4. PrepareCommit β CompleteCommit: Commit markers written
5. PrepareAbort β CompleteAbort: Abort markers written
6. CompleteCommit/CompleteAbort β Empty: Transaction complete
Transaction Coordinator Failure
Architecture Diagram
Scenario: Transaction coordinator fails during transaction
1. Producer sends produce requests
2. Coordinator fails
3. New coordinator elected from __transaction_state
4. Producer times out and aborts
5. Consumer sees aborted transaction (with isolation.level=read_committed)
6. Producer retries with new coordinator
Exactly-Once Consumer
// Consumer must read committed messages
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "order-processor");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
// Read only committed messages
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("orders"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
// Process record
processRecord(record);
}
// Commit offsets manually
consumer.commitSync();
}
Isolation Levels
Architecture Diagram
read_uncommitted (default):
- Sees all messages, including uncommitted
- Lower latency
- May see aborted transactions (but not their content)
read_committed:
- Sees only committed messages
- Higher latency (waits for transaction commit)
- Guaranteed consistency
- Aborted transactions are filtered out
Transaction Patterns
1. Read-Process-Write Pattern
public void readProcessWrite() {
KafkaConsumer<String, String> consumer = createConsumer();
KafkaProducer<String, String> producer = createTransactionalProducer();
producer.initTransactions();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
if (records.isEmpty()) continue;
try {
producer.beginTransaction();
for (ConsumerRecord<String, String> record : records) {
// Process
OutputMessage output = process(record.value());
// Write
producer.send(new ProducerRecord<>(
"output-topic",
record.key(),
output.toString()
));
}
// Commit consumer offsets
producer.sendOffsetsToTransaction(
getOffsets(records),
consumer.groupMetadata()
);
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
}
}
}
2. Multi-Topic Write Pattern
public void multiTopicWrite() {
producer.beginTransaction();
// Write to multiple topics atomically
producer.send(new ProducerRecord<>("topic-a", "key", "value"));
producer.send(new ProducerRecord<>("topic-b", "key", "value"));
producer.send(new ProducerRecord<>("topic-c", "key", "value"));
producer.commitTransaction();
}
3. Transactional Idempotent Writes
public void transactionalIdempotent() {
producer.beginTransaction();
for (Record record : records) {
// Idempotent write - same PID and sequence
// Broker deduplicates based on PID + sequence
producer.send(new ProducerRecord<>("topic", record.key, record.value));
}
producer.commitTransaction();
}
Monitoring Transactions
from prometheus_client import Counter, Gauge
# Transaction metrics
transaction_commits = Counter(
'kafka_transaction_commits_total',
'Total transaction commits',
['transactional_id']
)
transaction_aborts = Counter(
'kafka_transaction_aborts_total',
'Total transaction aborts',
['transactional_id']
)
transaction_latency = Gauge(
'kafka_transaction_latency_seconds',
'Transaction commit latency',
['transactional_id']
)
# Monitor transaction state
def monitor_transactions():
while True:
# Get transaction coordinator metrics
# via JMX or AdminClient
time.sleep(10)
Common Pitfalls
1. Transaction Timeout
// Problem: Processing exceeds transaction timeout
props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10000); // 10s
// Solution: Increase timeout
props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 300000); // 5min
2. Multiple Instances with Same Transactional ID
Architecture Diagram
Problem: Two producers with same transactional.id
- Only one can be active at a time
- Other gets ProducerFencedException
Solution: Use unique transactional.id per instance
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,
"processor-" + instanceId);
3. Transaction State Topic Cleanup
Architecture Diagram
Problem: __transaction_state grows unbounded
Solution:
- Configure cleanup policy
- Set appropriate retention
- Monitor topic size
Follow-Up Questions
- What is the difference between transactional.id and producer.id?
- How does Kafka handle transaction coordinator failures?
- Explain the purpose of the epoch in transactional producers.
- How would you implement idempotent consumer processing?
- What are the limitations of Kafka transactions?