Apache Kafka Exactly-Once Semantics: Transactions, Idempotency, and Consistency
Architecture Diagram: Exactly-Once Semantics Components
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β β
β EXACTLY-ONCE SEMANTICS ARCHITECTURE & COMPONENTS β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β β
β β PRODUCER LAYER β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β PRODUCER ARCHITECTURE β β β β
β β β β β β β β
β β β β βββββββββββββββ βββββββββββββββ βββββββββββββββ βββββββββββββββ βββββββββββββββ β β β β
β β β β β Transaction β β Idempotent β β Sequence β β Producer β β Epoch β β β β β
β β β β β Manager β β Manager β β Numbering β β ID (PID) β β Manager β β β β β
β β β β β β β β β β β β β β β β β β
β β β β β ββββββββββ β β ββββββββββ β β ββββββββββ β β ββββββββββ β β ββββββββββ β β β β β
β β β β β β begin β β β β dedup β β β β incr β β β β assign β β β β bump β β β β β β
β β β β β β txn() β β β β table β β β β per β β β β by β β β β on β β β β β β
β β β β β β β β β β (local)β β β β broker β β β β ZK β β β β error β β β β β β
β β β β β ββββββββββ β β ββββββββββ β β ββββββββββ β β ββββββββββ β β ββββββββββ β β β β β
β β β β βββββββββββββββ βββββββββββββββ βββββββββββββββ βββββββββββββββ βββββββββββββββ β β β β
β β β β β β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β β
β β βΌ β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β TRANSACTION COORDINATOR β β β
β β β β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ β β β β
β β β β β __txn- β β Transaction β β Timeout β β State β β β β β
β β β β β state- β β Metadata β β Handler β β Machine β β β β β
β β β β β 0-log β β Cache β β β β β β β β β
β β β β β β β β β ββββββββββ β β ββββββββββ β β β β β
β β β β β State: β β PID: 1234 β β β timeoutβ β β βempty β β β β β β
β β β β β Empty/ β β Epoch: 1 β β β 30s β β β β β β β β β β β
β β β β β Ongoing/ β β Partitions: β β ββββββββββ β β β βΌ β β β β β β
β β β β β Prepare β β [p0,p1,p2] β β β β βPrepareβ β β β β β
β β β β β Complete β β β β β β β β β β β β β β
β β β β ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ β β βΌ β β β β β β
β β β β β βCompleteβ β β β β β
β β β β β ββββββββββ β β β β β
β β β β ββββββββββββββββ β β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β β
β β βΌ β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β BROKER LAYER (Partitions) β β β
β β β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ β β β β
β β β β β Broker 0 β β Broker 1 β β Broker 2 β β Broker 3 β β β β β
β β β β β Part: 0,4 β β Part: 1,5 β β Part: 2,6 β β Part: 3,7 β β β β β
β β β β β β β β β β β β β β β β
β β β β β ββββββββββββ β β ββββββββββββ β β ββββββββββββ β β ββββββββββββ β β β β β
β β β β β β PID:1234 β β β β PID:1234 β β β β PID:1234 β β β β PID:1234 β β β β β β
β β β β β β Epoch:1 β β β β Epoch:1 β β β β Epoch:1 β β β β Epoch:1 β β β β β β
β β β β β β Seq: 0 β β β β Seq: 0 β β β β Seq: 0 β β β β Seq: 0 β β β β β β
β β β β β ββββββββββββ β β ββββββββββββ β β ββββββββββββ β β ββββββββββββ β β β β β
β β β β ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ β β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Architecture Diagram: Transaction Lifecycle State Machine
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β β
β TRANSACTION LIFECYCLE STATE MACHINE β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββ β β
β β β β β β
β β β EMPTY STATE β β β
β β β (No active transaction) β β β
β β β β β β
β β ββββββββββββββββββββββ¬βββββββββββββββββββββ β β
β β β β β
β β β initTransactions() / β β
β β β beginTransaction() β β
β β βΌ β β
β β βββββββββββββββββββββββββββββββββββββββββββ β β
β β β β β β
β β β ONGOING STATE β β β
β β β (Transaction in progress) β β β
β β β - Sending records β β β
β β β - Updating state stores β β β
β β β - Marking partitions as dirty β β β
β β β β β β
β β ββββββββββββββββββββββ¬βββββββββββββββββββββ β β
β β β β β
β β β commitTransaction() β β
β β βΌ β β
β β βββββββββββββββββββββββββββββββββββββββββββ β β
β β β β β β
β β β PREPARE COMMIT STATE β β β
β β β (Coordinator preparing) β β β
β β β - Write TxnMarker to partitions β β β
β β β - Wait for all ISR acknowledgment β β β
β β β β β β
β β ββββββββββββββββββββββ¬βββββββββββββββββββββ β β
β β β β β
β β β All markers written β β
β β βΌ β β
β β βββββββββββββββββββββββββββββββββββββββββββ β β
β β β β β β
β β β COMPLETE STATE β β β
β β β (Transaction committed) β β β
β β β - Write committed offsets β β β
β β β - Mark transaction as complete β β β
β β β β β β
β β ββββββββββββββββββββββ¬βββββββββββββββββββββ β β
β β β β β
β β β Next transaction β β
β β βΌ β β
β β βββββββββββββββββββββββββββββββββββββββββββ β β
β β β β β β
β β β EMPTY STATE β β β
β β β β β β
β β βββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β β ABORT PATH: β β
β β ONGOING βββΊ abortTransaction() βββΊ ABORT COMPLETE βββΊ EMPTY β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β TIMEOUT PATH: β β β
β β β ONGOING βββΊ timeout (30s default) βββΊ EXPIRED βββΊ Fence producer βββΊ EMPTY β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Architecture Diagram: Idempotent Producer Deduplication
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β β
β IDEMPOTENT PRODUCER DEDUPLICATION MECHANISM β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β β
β β PRODUCER SIDE β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β IN-FLIGHT REQUESTS TRACKER β β β β
β β β β β β β β
β β β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β β
β β β β β Request 1: PID=1234, Epoch=1, Seq=0 [ACKED] βββΊ Remove from tracker β β β β β
β β β β β Request 2: PID=1234, Epoch=1, Seq=1 [PENDING] βββΊ In flight β β β β β
β β β β β Request 3: PID=1234, Epoch=1, Seq=2 [PENDING] βββΊ In flight β β β β β
β β β β β Request 4: PID=1234, Epoch=1, Seq=3 [QUEUED] βββΊ Waiting for slot β β β β β
β β β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β β
β β β β β β β β
β β β β max.in.flight.requests.per.connection = 5 β β β β
β β β β (With idempotence enabled, ordering is guaranteed even with retries) β β β β
β β β β β β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β β
β β βΌ β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β BROKER SIDE DEDUPLICATION β β β
β β β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β β β β β
β β β β PER-PARTITION STATE: β β β β
β β β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β β
β β β β β Partition 0: β β β β β
β β β β β PID: 1234, Epoch: 1, Last Seq: 5 β β β β β
β β β β β Deduplication Window: [Seq 6 - Seq 5 + max.in.flight] β β β β β
β β β β β β β β β β
β β β β β Incoming: PID=1234, Epoch=1, Seq=6 βββΊ ACCEPT (new) β β β β β
β β β β β Incoming: PID=1234, Epoch=1, Seq=6 βββΊ REJECT (duplicate) β β β β β
β β β β β Incoming: PID=1234, Epoch=1, Seq=4 βββΊ REJECT (too old, outside window) β β β β β
β β β β β Incoming: PID=1234, Epoch=0, Seq=6 βββΊ REJECT (stale epoch) β β β β β
β β β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β β
β β β β β β β β
β β β β DEDUPLICATION TABLE (per partition): β β β β
β β β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β β
β β β β β PID β Epoch β Last Seq β Buffer β β β β β
β β β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β β
β β β β β 1234 β 1 β 5 β [6, 7, 8] (in-flight) β β β β β
β β β β β 5678 β 2 β 10 β [] (none) β β β β β
β β β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β β
β β β β β β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Formal Definitions
DfExactly-Once Semantics (EOS)
Exactly-once semantics guarantees that each message is delivered to the consumer exactly once β no duplicates, no data loss. In Kafka, EOS is achieved through three coordinated mechanisms: (1) idempotent producers that prevent duplicate writes per partition, (2) transactional producers that enable atomic multi-partition writes, and (3) consumer isolation levels that filter uncommitted records. End-to-end EOS requires all three components working together.
DfIdempotent Producer
An idempotent producer ensures that a message is written to a partition exactly once, even if the producer retries. Each producer instance receives a unique Producer ID (PID) and epoch. Each partition tracks the last sequence number per PID. Retries with the same sequence number are rejected as duplicates. Formally: for producer P with PID=p, the broker accepts message (p, seq) only if seq = lastAcceptedSeq(p) + 1.
DfTransactional Producer
A transactional producer enables atomic writes across multiple partitions and topics. A transaction groups multiple produce requests into a single atomic unit: either all messages in the transaction are committed (visible to read_committed consumers) or none are (aborted and invisible). The Transaction Coordinator manages the two-phase commit protocol using the __transaction_state topic.
Key Formulas
Idempotent Deduplication Condition
Here,
- =Producer ID
- =Sequence number of the incoming message
- =Last accepted sequence number for this PID on this partition
Deduplication Window
Here,
- =max.in.flight.requests.per.connection (default 5)
- =Last committed sequence number
The Producer ID (PID) is assigned by the Transaction Coordinator and persisted in __transaction_state. On producer restart, a new PID is acquired with an incremented epoch, which fences (invalidates) the old producer instance. This prevents split-brain scenarios where two producer instances write to the same partition.
ThOrdering Guarantee with Idempotence
With idempotence enabled, Kafka guarantees exactly-once delivery and ordering within a partition even with retries and max.in.flight.requests.per.connection > 1. The broker buffers out-of-order messages (within the deduplication window) and delivers them in sequence order. Formally: for a producer with PID=p sending messages s_1, s_2, ..., s_k to partition P, a consumer on P will observe them in order s_1, s_2, ..., s_k, each exactly once.
ThExactly-Once Semantics Proof Sketch
Claim: Kafka achieves end-to-end exactly-once semantics when: (1) producer uses idempotence + transactions, (2) consumer uses isolation.level=read_committed, (3) sink operations are idempotent.
Proof sketch:
- Source deduplication: Idempotent producer ensures each message is written exactly once per partition (by sequence number tracking).
- Atomic multi-partition write: Transaction coordinator ensures all messages in a transaction are either all committed or all aborted (two-phase commit via __transaction_state).
- Consumer filtering: read_committed consumers only see messages from committed transactions (filtered by LSO β Last Stable Offset).
- Sink idempotency: If the sink operation (database write, HTTP call) is idempotent, reprocessing the same committed message produces the same result.
- Combined: The composition of these four properties yields exactly-once end-to-end delivery. β‘
Transaction timeout (transaction.timeout.ms, default 60s) determines how long a transaction can remain open. If the producer crashes during a transaction, the coordinator will abort it after the timeout. Keep transactions short to minimize resource holding and reduce timeout risk.
Detailed Explanation
Exactly-once semantics (EOS) in Kafka ensures that each message is delivered exactly once to the consumer, with no duplicates and no data loss. This is achieved through three coordinated mechanisms: idempotent producers, transactional producers, and consumer isolation levels. The implementation requires careful coordination between producers, brokers, and consumers to guarantee end-to-end exactly-once delivery.
Idempotent Producers prevent duplicate messages from being written during retries. When enabled (enable.idempotence=true), each producer is assigned a Producer ID (PID) and an epoch number. Each partition tracks the sequence number of the last successfully written message from each PID. When a producer sends a message, it includes the PID, epoch, and sequence number. The broker checks if the sequence number is the next expected value; if not, it rejects the message as a duplicate. The deduplication window is determined by max.in.flight.requests.per.connection (default 5), meaning messages with sequence numbers within this range of the last accepted sequence are buffered for ordering.
The Producer ID and Epoch mechanism handles producer restarts and leadership changes. When a producer restarts, it acquires a new epoch, which causes brokers to discard the old deduplication state for that PID, preventing conflicts between old and new producer instances. The epoch is incremented when the producer is fenced (e.g., after a leadership change), ensuring that only one producer instance can write to a partition at a time. This fencing mechanism is critical for preventing split-brain scenarios where two producer instances might write conflicting data.
Transactional Producers enable atomic writes across multiple partitions. A transactional producer begins a transaction, sends messages to multiple partitions, and then commits or aborts the transaction atomically. The Transaction Coordinator (a broker) manages the transaction state and ensures that all partitions involved in the transaction either have all records written (commit) or none (abort). The transaction state is stored in a special internal topic (__transaction_state), and transaction markers (control records) are written to the affected partitions to indicate the transaction's outcome.
Consumer Isolation Levels determine how consumers read transactional data. The read_committed isolation level ensures that consumers only read records from committed transactions, while read_uncommitted reads all records regardless of transaction status. With read_committed, consumers automatically filter out records from uncommitted transactions, providing a consistent view of the data. This isolation is implemented by the broker, which tracks the Last Stable Offset (LSO) - the offset of the first message that is either committed or part of an ongoing transaction.
Transaction Coordinator Protocol: The coordinator uses a two-phase commit protocol. Phase 1 (Prepare): The coordinator writes transaction markers to all partitions involved in the transaction. Phase 2 (Complete): The coordinator writes the committed offsets to the __consumer_offsets topic atomically. If the coordinator fails between phases, a new coordinator will complete the transaction by reading the transaction state from __transaction_state and completing the two-phase commit. This ensures that transactions are never left in an indeterminate state.
End-to-End Exactly-Once: Achieving exactly-once semantics end-to-end requires: (1) Idempotent producer (prevents duplicates at source), (2) Transactional producer (ensures atomicity of multi-partition writes), (3) Consumer with read_committed (ensures only committed records are processed), and (4) Idempotent sink operations (prevents duplicates at destination). Kafka Streams provides this out of the box with processing.guarantee=exactly_once_v2, which automatically configures all these components.
Key Concepts Table
| Component | Description | Key Configurations | Failure Mode |
|---|---|---|---|
| Producer ID (PID) | Unique identifier for producer instance | transactional.id | New PID on restart |
| Epoch | Fencing mechanism for producer instances | Auto-incremented | Fence stale producers |
| Sequence Number | Per-partition message ordering | Auto-incremented | Duplicate rejection |
| Transaction Coordinator | Broker managing transaction state | Automatic election | Transaction recovery |
| Transaction Markers | Control records in partition log | Automatic | Abort uncommitted |
| LSO (Last Stable Offset) | Highest offset not in transaction | Automatic | Consumer filtering |
| Deduplication Table | Per-partition PID/epoch/seq tracking | max.in.flight | Duplicate rejection |
| Fencing | Preventing stale producer writes | Automatic | Split-brain prevention |
| Idempotent Producer | Prevents duplicate writes | enable.idempotence | Retry safety |
| Transactional Producer | Atomic multi-partition writes | transactional.id | Atomicity guarantee |
Code Examples
Idempotent Producer with Detailed Configuration
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.Future;
public class IdempotentProducerExample {
public static void main(String[] args) {
Properties props = new Properties();
// Bootstrap servers
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"kafka-broker-0:9092,kafka-broker-1:9092,kafka-broker-2:9092");
// Serializers
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
// Enable idempotence (critical for exactly-once)
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
// Required for idempotence
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
// Optional: Transactional configuration for multi-partition atomicity
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-producer-1");
// Performance tuning
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// Initialize transactions (required if transactional.id is set)
producer.initTransactions();
try {
// Begin transaction for atomic writes
producer.beginTransaction();
for (int i = 0; i < 1000; i++) {
String key = "order-" + (i % 100);
String value = "{\"orderId\": \"" + i + "\", \"amount\": " +
(Math.random() * 1000) + "}";
ProducerRecord<String, String> record =
new ProducerRecord<>("order-events", key, value);
// Add metadata headers
record.headers().add("correlation-id",
java.util.UUID.randomUUID().toString().getBytes());
record.headers().add("idempotent", "true".getBytes());
// Send with callback for monitoring
producer.send(record, (metadata, exception) -> {
if (exception != null) {
System.err.println("Failed: " + exception.getMessage());
} else {
System.out.printf("Sent to partition %d, offset %d, " +
"timestamp %d%n",
metadata.partition(), metadata.offset(),
metadata.timestamp());
}
});
}
// Commit transaction atomically
producer.commitTransaction();
System.out.println("Transaction committed successfully");
} catch (Exception e) {
// Abort transaction on any failure
producer.abortTransaction();
System.err.println("Transaction aborted: " + e.getMessage());
throw e;
} finally {
producer.close();
}
}
}
Consumer with Transactional Read Isolation
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.*;
public class TransactionalConsumerExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"kafka-broker-0:9092,kafka-broker-1:9092,kafka-broker-2:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "order-consumer-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
// Critical: Read committed isolation for exactly-once
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
// Disable auto-commit for manual offset management
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// Fetch configuration
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1);
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// Subscribe with rebalance listener
consumer.subscribe(Arrays.asList("order-events"),
new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
System.out.println("Partitions revoked: " + partitions);
// Commit any pending offsets
consumer.commitSync(Duration.ofSeconds(30));
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
System.out.println("Partitions assigned: " + partitions);
}
});
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
if (records.isEmpty()) continue;
// Process records
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();
for (ConsumerRecord<String, String> record : records) {
// Process only committed records (exactly-once guarantee)
processRecord(record);
// Track offset for manual commit
offsetsToCommit.put(
new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1, "processed")
);
}
// Commit offsets after successful processing
if (!offsetsToCommit.isEmpty()) {
consumer.commitSync(offsetsToCommit, Duration.ofSeconds(30));
}
}
} finally {
consumer.close();
}
}
private static void processRecord(ConsumerRecord<String, String> record) {
System.out.printf("Processing committed record: topic=%s, partition=%d, " +
"offset=%d, key=%s%n",
record.topic(), record.partition(), record.offset(), record.key());
// Business logic here
// This record is guaranteed to be from a committed transaction
}
}
Kafka Streams with Exactly-Once Semantics
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import java.time.Duration;
import java.util.Properties;
public class ExactlyOnceStreamExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "exactly-once-stream-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
// Exactly-once semantics (v2 for improved performance)
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
StreamsConfig.EXACTLY_ONCE_V2);
// State store configuration
props.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams");
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
StreamsBuilder builder = new StreamsBuilder();
// Source stream
KStream<String, String> sourceStream = builder.stream("input-events",
Consumed.with(Serdes.String(), Serdes.String()));
// Stateful transformation with exactly-once guarantee
KTable<Windowed<String>, Long> windowedCounts = sourceStream
.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
.count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as(
"windowed-count-store")
.withValueSerde(Serdes.Long()));
// Branch operations (all atomic within transaction)
Map<String, KStream<String, String>> branches = sourceStream
.filter((key, value) -> value != null)
.branch((key, value) -> value.contains("HIGH"),
Branched.withConsumer(stream ->
stream.to("high-priority-events",
Produced.with(Serdes.String(), Serdes.String()))))
.branch((key, value) -> value.contains("NORMAL"),
Branched.withConsumer(stream ->
stream.to("normal-priority-events",
Produced.with(Serdes.String(), Serdes.String()))));
// Join operation (atomic)
KTable<String, String> referenceData = builder.table("reference-data",
Consumed.with(Serdes.String(), Serdes.String()));
KStream<String, String> enrichedStream = sourceStream
.join(referenceData,
(event, ref) -> event + ":" + ref,
Joined.with(Serdes.String(), Serdes.String(), Serdes.String()));
enrichedStream.to("enriched-events",
Produced.with(Serdes.String(), Serdes.String()));
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
// Graceful shutdown
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
streams.close(Duration.ofSeconds(30));
streams.cleanUp();
}));
}
}
Transaction Coordinator Monitoring Script
#!/bin/bash
# Monitor Kafka transaction coordinators and transaction state
BOOTSTRAP_SERVER="kafka-broker-0:9092"
echo "=== KAFKA TRANSACTION MONITORING ==="
echo "Timestamp: $(date)"
echo ""
# List transactional IDs
echo "--- Active Transactional IDs ---"
kafka-console-consumer.sh \
--bootstrap-server $BOOTSTRAP_SERVER \
--topic __transaction_state \
--from-beginning \
--max-messages 100 \
--formatter "kafka.coordinator.transaction.TransactionStateManager\$TransactionStateMessageFormatter"
echo ""
echo "--- Transaction Coordinator Status ---"
# Check transaction coordinator for specific transactional ID
kafka-transactions.sh \
--bootstrap-server $BOOTSTRAP_SERVER \
--describe \
--transactional-id order-producer-1
echo ""
echo "--- Transaction State ---"
# List ongoing transactions
kafka-transactions.sh \
--bootstrap-server $BOOTSTRAP_SERVER \
--list
echo ""
echo "--- Producer Fence Status ---"
# Check for fenced producers
kafka-console-consumer.sh \
--bootstrap-server $BOOTSTRAP_SERVER \
--topic __transaction_state \
--from-beginning \
--max-messages 50 \
--property print.key=true \
--property print.value=true
Performance Metrics
| Configuration | Throughput (msg/sec) | Latency (p99) | Duplicate Rate | Memory Usage |
|---|---|---|---|---|
| No Idempotence | 500,000 | 5ms | 0.1% | 32MB |
| Idempotent (acks=all) | 450,000 | 8ms | 0% | 64MB |
| Transactional (single partition) | 400,000 | 12ms | 0% | 128MB |
| Transactional (multi-partition) | 300,000 | 20ms | 0% | 128MB |
| Kafka Streams (EOS v1) | 200,000 | 25ms | 0% | 256MB |
| Kafka Streams (EOS v2) | 250,000 | 18ms | 0% | 256MB |
| Consumer (read_committed) | 100,000 | 2ms | 0% | 32MB |
| Consumer (read_uncommitted) | 150,000 | 1ms | 0% | 32MB |
Best Practices
-
Idempotence First: Always enable
enable.idempotence=truebefore considering transactions. Idempotence provides duplicate prevention per partition with minimal overhead. -
Transactional Scope: Keep transaction scope as small as possible. Long-running transactions increase latency, memory usage, and the risk of timeout. Aim for transactions that complete within seconds.
-
Fencing Awareness: Use unique
transactional.idper producer instance. Never share transactional IDs across different application instances. Understand that fencing prevents split-brain but requires proper instance management. -
Consumer Isolation: Always use
isolation.level=read_committedfor consumers reading transactional data. Useread_uncommittedonly when you need to read uncommitted data for debugging. -
Error Handling: Implement proper error handling for transaction failures. Use
abortTransaction()on any exception to ensure clean state. Monitor transaction coordinator health and handle coordinator failover gracefully. -
Performance Tuning: Enable compression (
compression.type=lz4) to reduce network overhead. Tunebatch.sizeandlinger.msto balance latency vs. throughput. Usemax.in.flight.requests.per.connection=5for optimal pipelining with idempotence. -
Monitoring: Track producer metrics:
transactional-id,epoch,transaction-send-rate,transaction-commit-rate. Track consumer metrics:records-lag-max,last-stable-offset. Monitor coordinator metrics:txn-commit-rate,txn-abort-rate. -
Testing: Test exactly-once semantics by writing a consumer that deduplicates by message ID. Verify that reprocessing the same data produces identical results. Use chaos testing to simulate producer restarts and network partitions.
-
Migration: When enabling idempotence on existing topics, ensure all producers are updated simultaneously. Test with a subset of partitions before full rollout. Monitor for fencing events during migration.
-
Kafka Streams EOS: Use
processing.guarantee=exactly_once_v2for new applications. Understand the overhead: EOS v2 reduces latency by 30% compared to v1. Ensurestate.diris on fast storage (SSD) for optimal performance.
Key Takeaways:
- EOS = idempotent producers (per-partition dedup) + transactional producers (atomic multi-partition writes) + read_committed consumers
- Idempotent deduplication: broker accepts message only if seq = lastSeq(PID) + 1; dedup window = max.in.flight.requests
- Transaction coordinator uses two-phase commit: write TxnMarker to partitions, then commit offsets atomically
- Producer fencing via epoch increment prevents split-brain; new PID on restart invalidates old producer
- End-to-end EOS requires idempotent sink operations at the destination
- EOS v2 (processing.guarantee=exactly_once_v2) reduces overhead by ~30% vs v1
See also: Data Engineering Pipeline patterns (data-engineering/019) | PySpark Structured Streaming (pyspark/11-structured-streaming)