Apache Kafka Producer & Consumer: Advanced Configurations and Patterns
Architecture Diagram: Producer Message Flow
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β β
β KAFKA PRODUCER MESSAGE FLOW & INTERNAL ARCHITECTURE β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β β
β β APPLICATION LAYER β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β β β
β β β ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ β β β
β β β β Record 1 β β Record 2 β β Record 3 β β Record 4 β β Record 5 β β β β
β β β β key=K1 β β key=K2 β β key=K1 β β key=K3 β β key=K2 β β β β
β β β β val=V1 β β val=V2 β β val=V3 β β val=V4 β β val=V5 β β β β
β β β ββββββ¬ββββββ ββββββ¬ββββββ ββββββ¬ββββββ ββββββ¬ββββββ ββββββ¬ββββββ β β β
β β β β β β β β β β β
β β β βββββββββββββββββ΄ββββββββββββββββΌββββββββββββββββ΄ββββββββββββββββ β β β
β β β β β β β
β β ββββββββββββββββββββββββββββββββββββββββββΌββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β β
β β βΌ β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β INTERCEPTORS & SERIALizers β β β
β β β βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ β β β
β β β β Producer β β Key β β Value β β β β
β β β β Interceptor βββββΊβ Serializer βββββΊβ Serializer β β β β
β β β β (Logging/Metrics)β β (String/Avro) β β (JSON/Avro) β β β β
β β β βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β β
β β βΌ β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β PARTITIONING & ROUTING β β β
β β β βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ β β β
β β β β Partitioner β β Partition β β Record β β β β
β β β β (Default/Custom)βββββΊβ Selector βββββΊβ Batch β β β β
β β β β MurmurHash2 β β Round Robin β β Accumulator β β β β
β β β βββββββββββββββββββ βββββββββββββββββββ ββββββββββ¬βββββββββ β β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββΌββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β β
β β βΌ β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β NETWORK I/O LAYER β β β
β β β βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ β β β
β β β β Sender Thread β β InFlight β β Network β β Response β β β β
β β β β (Async Send) βββββΊβ Requests Queue βββββΊβ Client (NIO) βββββΊβ Handler β β β β
β β β β β β (max.in.flight) β β β β β β β β
β β β βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ β β β
β β β β β β β β β β
β β β β β β β β β β
β β β βΌ βΌ βΌ βΌ β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β BROKER 0 BROKER 1 BROKER 2 BROKER 3 β β β β
β β β β ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ β β β β
β β β β βPartition β βPartition β βPartition β βPartition β β β β β
β β β β β 0 β β 1 β β 2 β β 3 β β β β β
β β β β β (Leader) β β (Leader) β β (Leader) β β (Leader) β β β β β
β β β β ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ β β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Architecture Diagram: Consumer Group Rebalancing Process
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β β
β CONSUMER GROUP REBALANCING STATE MACHINE β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β β
β β PHASE 1: PREPARE REBALANCE (Trigger: Consumer Join/Leave) β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β β β
β β β βββββββββββββββ βββββββββββββββ βββββββββββββββ βββββββββββββββ β β β
β β β β Consumer 1 β β Consumer 2 β β Consumer 3 β β Consumer 4 β β β β
β β β β PREPARING β β PREPARING β β PREPARING β β PREPARING β β New Consumer β β β
β β β β PREPARE β β PREPARE β β PREPARE β β PREPARE β β β β
β β β ββββββββ¬βββββββ ββββββββ¬βββββββ ββββββββ¬βββββββ ββββββββ¬βββββββ β β β
β β β β β β β β β β
β β β βΌ βΌ βΌ βΌ β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β JOIN GROUP REQUEST β β β β
β β β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β β
β β β β β Members: [consumer-1, consumer-2, consumer-3, consumer-4] β β β β β
β β β β β Group Protocol: CooperativeStickyAssignor β β β β β
β β β β β Subscription: [order-events, payment-events] β β β β β
β β β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β β
β β βΌ β β
β β PHASE 2: SYNC GROUP (Partition Assignment) β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β GROUP COORDINATOR (Broker 0) β β β β
β β β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β β
β β β β β ASSIGNMENT STRATEGY: CooperativeStickyAssignor β β β β β
β β β β β β β β β β
β β β β β Partition Assignments: β β β β β
β β β β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β β β
β β β β β β Consumer 1 βββ Partition 0, Partition 4 β β β β β β
β β β β β β Consumer 2 βββ Partition 1, Partition 5 β β β β β β
β β β β β β Consumer 3 βββ Partition 2 β β β β β β
β β β β β β Consumer 4 βββ Partition 3 β β β β β β
β β β β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β β β
β β β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β β
β β βΌ β β
β β PHASE 3: STABLE (Normal Operation) β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β β β
β β β βββββββββββββββ βββββββββββββββ βββββββββββββββ βββββββββββββββ β β β
β β β β Consumer 1 β β Consumer 2 β β Consumer 3 β β Consumer 4 β β β β
β β β β STABLE β β STABLE β β STABLE β β STABLE β β β β
β β β β Part: 0,4 β β Part: 1,5 β β Part: 2 β β Part: 3 β β β β
β β β β Heartbeat β β Heartbeat β β Heartbeat β β Heartbeat β β β β
β β β β every 3s β β every 3s β β every 3s β β every 3s β β β β
β β β βββββββββββββββ βββββββββββββββ βββββββββββββββ βββββββββββββββ β β β
β β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Architecture Diagram: Offset Management Flow
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β β
β OFFSET MANAGEMENT & DUAL-WRITE ARCHITECTURE β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β β
β β CONSUMER PROCESSING PIPELINE β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β β β
β β β ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ β β β
β β β β POLL β β DECODE β β PROCESS β β VALIDATEβ β ENRICH β β COMMIT β β β β
β β β β βββββΊβ βββββΊβ βββββΊβ βββββΊβ βββββΊβ β β β β
β β β β fetch β β deser β β businessβ β schema β β lookup β β offset β β β β
β β β β records β β records β β logic β β valid β β cache β β to ZK β β β β
β β β ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββ¬ββββ β β β
β β β β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββΌβββββββββββββββββ β β
β β β β β
β β βΌ β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β OFFSET STORAGE OPTIONS β β β
β β β β β β
β β β OPTION 1: AUTO-COMMIT OPTION 2: MANUAL SYNC OPTION 3: TRANSACTIONAL β β β
β β β ββββββββββββββββββββββββ ββββββββββββββββββββββββ ββββββββββββββββββββββββ β β β
β β β β enable.auto.commit β β commitSync() β β beginTransaction() β β β β
β β β β = true β β after processing β β process records β β β β
β β β β auto.commit.intervalβ β every N records β β commitTransaction() β β β β
β β β β = 5000ms β β or time-based β β atomic with DB β β β β
β β β ββββββββββββββββββββββββ ββββββββββββββββββββββββ ββββββββββββββββββββββββ β β β
β β β β οΈ May lose messages β
Reliable, simpler β
Exactly-once semantics β β β
β β β β β β
β β β OPTION 4: EXTERNAL STORAGE OPTION 5: AT-LEAST-ONCE β β β
β β β ββββββββββββββββββββββββ ββββββββββββββββββββββββ β β β
β β β β Store offsets in β β Producer txn with β β β β
β β β β database (MySQL, β β consumer offsets β β β β
β β β β PostgreSQL, Redis) β β in same txn β β β β
β β β β + idempotent writes β β β β β β
β β β ββββββββββββββββββββββββ ββββββββββββββββββββββββ β β β
β β β β
Complete control β
Combined with DB writes β β β
β β β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Formal Definitions
DfProducer Batching
Producer batching is the accumulation of multiple records into a single network request before sending to the broker. The RecordAccumulator buffers records per partition, and the Sender thread drains these buffers when either batch.size is reached or linger.ms expires. Batching amortizes network overhead and enables compression across multiple records, significantly improving throughput at the cost of slight latency increase.
DfConsumer Lag
Consumer lag is the difference between the latest offset in a partition log and the consumer's last committed offset. It measures how far behind a consumer is from the producer's write position. Consumer lag = log_end_offset - committed_offset. Persistent lag indicates the consumer cannot keep up with the production rate, requiring scaling or optimization.
DfIdempotent Producer
An idempotent producer prevents duplicate messages during retries by assigning each producer a Producer ID (PID) and tracking per-partition sequence numbers. The broker rejects any message whose sequence number has already been seen, ensuring exactly-once delivery per partition even across retries and producer restarts.
Key Formulas
Consumer Processing Rate
Here,
- =Records per poll (max.poll.records)
- =Time to process one poll batch
- =Fetch buffer size (fetch.max.bytes)
- =Fetch round-trip time
Consumer Lag Growth Rate
Here,
- =Message production rate (msg/sec)
- =Message consumption rate (msg/sec)
When consumer lag grows continuously (dLag/dt > 0), the consumer is falling behind. Solutions: (1) increase partition count and consumer instances, (2) optimize processing logic, (3) increase max.poll.records, (4) reduce per-record processing time.
The acks setting controls the durability-latency tradeoff: acks=0 (fire-and-forget, lowest latency, no durability), acks=1 (leader acknowledgment, moderate), acks=all (all ISR acknowledge, highest durability, highest latency). For exactly-once, use acks=all with idempotence enabled.
Detailed Explanation
The Kafka Producer API is a sophisticated client library designed for high-throughput, low-latency message publishing. At its core, the producer operates on an asynchronous, batching model where records are accumulated in a memory buffer before being sent in batches to the appropriate brokers. The producer maintains a RecordAccumulator that buffers records per partition, and a background Sender thread that drains these buffers and sends batched requests to the brokers. This architecture minimizes network overhead and enables efficient compression, as multiple records can be compressed together in a single batch.
The Partitioning Strategy is critical for both ordering and load distribution. By default, Kafka uses a murmur2 hash of the key modulo the number of partitions, ensuring that records with the same key always go to the same partition (and thus maintain ordering). Custom partitioners can be implemented for more sophisticated routing, such as sticky partitioning (which batches records to the same partition before switching) or region-aware partitioning. The partitioner is invoked after serialization but before batching, and the result determines which partition's accumulator buffer receives the record.
Producer Configurations heavily influence behavior. The acks setting determines durability guarantees: acks=0 provides no acknowledgment (fire-and-forget), acks=1 acknowledges when the leader writes to its local log, and acks=all acknowledges when all in-sync replicas write the record. The max.in.flight.requests.per.connection setting controls how many unacknowledged requests can be outstanding per broker connection, with values > 1 potentially causing reordering (unless idempotence is enabled). The retries and retry.backoff.ms settings control automatic retry behavior, and with idempotence enabled, the producer will automatically retry with deduplication to avoid duplicates.
Consumer Group Coordination is managed by a Group Coordinator broker that maintains the group's membership state and partition assignments. When a consumer joins or leaves, a rebalance is triggered using a two-phase protocol: first, consumers send JoinGroup requests with their subscriptions, then the coordinator elects a leader consumer (typically the first to join) who computes the partition assignment, and finally all consumers send SyncGroup requests to receive their assignments. The CooperativeStickyAssignor (recommended) performs incremental rebalancing, only revoking partitions that need to move, minimizing the impact on processing continuity.
Offset Management is crucial for at-least-once and exactly-once processing semantics. Offsets can be committed automatically (enable.auto.commit=true) or manually via commitSync() or commitAsync(). Manual commits provide more control, allowing consumers to commit only after successful processing. For transactional systems, offsets can be committed atomically with the business operation using sendOffsetsToTransaction(). The __consumer_offsets topic stores committed offsets, and the consumer reads these on startup to resume from the last committed position. The auto.offset.reset setting determines behavior when no committed offset exists: earliest reads from the beginning, latest reads from the end, and none throws an exception.
Consumer Polling and Processing follows a single-threaded model where poll() fetches batches of records, and the application processes them sequentially. The max.poll.records setting controls how many records are returned per poll, while max.poll.interval.ms sets the maximum time between polls before the consumer is considered failed and removed from the group. This design prevents slow consumers from blocking the group but requires careful tuning of processing time vs. poll interval. The isolation.level setting determines transactional visibility: read_uncommitted sees all records, while read_committed only sees records from committed transactions.
Key Concepts Table
| Concept | Description | Configuration | Impact |
|---|---|---|---|
| Record Accumulator | In-memory buffer for batching records | buffer.memory, batch.size | Throughput vs. Memory |
| Sender Thread | Background thread sending batches | linger.ms, request.timeout.ms | Latency vs. Throughput |
| Partitioner | Determines target partition for record | partitioner.class | Ordering, Load Distribution |
| Idempotence | Prevents duplicate records on retry | enable.idempotence | Exactly-once per partition |
| Transactional | Atomic multi-partition writes | transactional.id | Exactly-once across partitions |
| Group Coordinator | Broker managing consumer groups | N/A | Rebalance coordination |
| Heartbeat | Consumer liveness signal | heartbeat.interval.ms | Failure detection speed |
| Session Timeout | Consumer failure threshold | session.timeout.ms | False positive vs. Detection |
| Rebalance Listener | Callback for partition changes | partition.assignment.strategy | Rebalance handling |
| Isolation Level | Transactional read semantics | isolation.level | Consistent reads |
Code Examples
Advanced Producer with Custom Partitioner
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.*;
public class CustomPartitionerExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
// Use custom partitioner
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,
RegionAwarePartitioner.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// Send with custom headers for routing
for (int i = 0; i < 100; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>(
"user-events",
"user-" + (i % 10),
"{\"userId\": " + i + ", \"action\": \"click\"}"
);
// Add headers for partitioning logic
record.headers().add("region", ("us-east-" + (i % 3)).getBytes());
record.headers().add("priority", (i % 5 == 0 ? "high" : "normal").getBytes());
producer.send(record, (metadata, exception) -> {
if (exception != null) {
System.err.println("Error: " + exception.getMessage());
} else {
System.out.printf("Partition: %d, Offset: %d%n",
metadata.partition(), metadata.offset());
}
});
}
producer.flush();
producer.close();
}
}
// Custom partitioner implementation
class RegionAwarePartitioner implements Partitioner {
@Override
public void configure(Map<String, ?> configs) {}
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
// Default: hash-based partitioning
if (keyBytes == null) {
return ThreadLocalRandom.current().nextInt(numPartitions);
}
// Region-aware: distribute by region, then by key hash
String region = extractRegionFromKey(key.toString());
int regionHash = region.hashCode() % numPartitions;
// Within region, use key hash
int keyHash = Utils.murmur2(keyBytes) % numPartitions;
// Combine for final partition
return (regionHash + keyHash) % numPartitions;
}
private String extractRegionFromKey(String key) {
// Extract region from key pattern
return key.contains("us") ? "us" : key.contains("eu") ? "eu" : "other";
}
@Override
public void close() {}
}
Consumer with Manual Offset Management and Retry
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
public class ManualOffsetConsumerExample {
private static final int MAX_RETRIES = 3;
private static final Map<TopicPartition, Long> failedOffsets = new ConcurrentHashMap<>();
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "retry-consumer-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
props.put(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG, false);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("order-events"), new RebalanceHandler());
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();
for (ConsumerRecord<String, String> record : records) {
try {
processRecordWithRetry(record);
// Track successful offset
offsetsToCommit.put(
new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1, "processed")
);
} catch (Exception e) {
System.err.printf("Failed to process record at offset %d: %s%n",
record.offset(), e.getMessage());
// Track failed offset for later retry
failedOffsets.put(
new TopicPartition(record.topic(), record.partition()),
record.offset()
);
// Don't commit this offset
break;
}
}
// Commit only successful offsets
if (!offsetsToCommit.isEmpty()) {
consumer.commitSync(offsetsToCommit, Duration.ofSeconds(30));
}
// Process any failed records
retryFailedRecords(consumer);
}
} finally {
consumer.close();
}
}
private static void processRecordWithRetry(ConsumerRecord<String, String> record)
throws Exception {
int retries = 0;
while (retries < MAX_RETRIES) {
try {
// Simulate processing
if (record.offset() % 10 == 0) {
throw new RuntimeException("Simulated processing error");
}
System.out.printf("Processed: %s%n", record.value());
return;
} catch (Exception e) {
retries++;
if (retries >= MAX_RETRIES) {
throw e;
}
Thread.sleep(1000 * retries); // Exponential backoff
}
}
}
private static void retryFailedRecords(KafkaConsumer<String, String> consumer) {
if (failedOffsets.isEmpty()) return;
System.out.printf("Retrying %d failed records%n", failedOffsets.size());
failedOffsets.forEach((partition, offset) -> {
// Seek to failed offset for retry
consumer.seek(partition, offset);
});
failedOffsets.clear();
}
static class RebalanceHandler implements ConsumerRebalanceListener {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
System.out.println("Partitions revoked: " + partitions);
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
System.out.println("Partitions assigned: " + partitions);
}
}
}
Transactional Producer-Consumer Example
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.time.Duration;
import java.util.*;
public class ExactlyOnceProducerConsumerExample {
public static void main(String[] args) {
// Transactional producer that reads from one topic and writes to another
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
producerProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "eos-transformer-1");
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
Properties consumerProps = new Properties();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "eos-transformer-group");
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
consumerProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
producer.initTransactions();
consumer.subscribe(Arrays.asList("input-events"));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
if (records.isEmpty()) continue;
producer.beginTransaction();
try {
for (ConsumerRecord<String, String> record : records) {
// Transform the record
String transformedValue = transform(record.value());
// Write to output topic
ProducerRecord<String, String> outputRecord = new ProducerRecord<>(
"output-events",
record.key(),
transformedValue
);
// Add transformation metadata
outputRecord.headers().add("source-topic", record.topic().getBytes());
outputRecord.headers().add("source-offset",
String.valueOf(record.offset()).getBytes());
outputRecord.headers().add("transformed-at",
String.valueOf(System.currentTimeMillis()).getBytes());
producer.send(outputRecord);
}
// Commit offsets atomically with the transaction
consumer.commitSync(); // Commit to consumer's __consumer_offsets
producer.sendOffsetsToTransaction(
consumer.position(new TopicPartition("input-events", 0)),
"eos-transformer-group"
);
producer.commitTransaction();
System.out.printf("Committed transaction for %d records%n", records.count());
} catch (Exception e) {
producer.abortTransaction();
System.err.println("Transaction aborted: " + e.getMessage());
}
}
} finally {
producer.close();
consumer.close();
}
}
private static String transform(String value) {
// Example transformation: add processed timestamp
return value.replace("}", ", \"processedAt\": " + System.currentTimeMillis() + "}");
}
}
Performance Metrics
| Scenario | Throughput (msg/sec) | Latency (p99) | Memory Usage | CPU Usage |
|---|---|---|---|---|
| Async Batch (16KB) | 500,000 | 5ms | 64MB | 30% |
| Sync (acks=all) | 50,000 | 50ms | 32MB | 15% |
| Idempotent | 450,000 | 8ms | 64MB | 35% |
| Transactional | 300,000 | 15ms | 128MB | 40% |
| Compression (gzip) | 200,000 | 10ms | 16MB | 50% |
| Compression (lz4) | 400,000 | 7ms | 32MB | 25% |
| Consumer (single) | 100,000 | 2ms | 32MB | 20% |
| Consumer (group=4) | 400,000 | 5ms | 128MB | 80% |
Best Practices
-
Batch Size Optimization: Increase
batch.sizeto 64KB-128KB for higher throughput, and uselinger.ms=10-50to allow batching. This trades slight latency for significantly higher throughput. -
Idempotent Producers: Always enable
enable.idempotence=truein production. This provides exactly-once semantics per partition without performance overhead. Combine withacks=allandmax.in.flight.requests.per.connection=5. -
Transaction Management: Use transactional producers when operations span multiple partitions/topics. Keep transaction scope small and avoid long-running transactions that block others.
-
Consumer Group Sizing: Match consumer count to partition count. More consumers than partitions wastes resources. Use
CooperativeStickyAssignorfor minimal disruption during rebalances. -
Offset Commit Strategy: Disable auto-commit and commit manually after processing. Use
commitSync()for critical offsets andcommitAsync()for bulk commits. Never commit offsets for unprocessed records. -
Error Handling: Implement retry logic with exponential backoff for transient failures. Use a dead-letter queue (DLQ) for records that fail after max retries. Monitor consumer lag to detect processing bottlenecks.
-
Memory Management: Configure
max.partition.fetch.bytesto control memory usage per partition. Setfetch.min.bytesandfetch.max.wait.msto batch fetch requests efficiently. -
Monitoring: Track producer metrics:
record-send-rate,batch-size-avg,compression-rate-avg,request-latency-avg. Track consumer metrics:records-lag-max,records-consumed-rate,poll-rate. -
Security: Use SASL/SCRAM for authentication and SSL for encryption. Implement ACLs to restrict topic access. Use separate producers for different security zones.
-
Testing: Use Testcontainers or embedded Kafka for integration tests. Test with realistic volumes and verify exactly-once semantics with idempotent consumer writes.
Key Takeaways:
- Producer batching (batch.size + linger.ms) trades latency for throughput; typical optimal batch size is 64KB-128KB
- Consumer lag = log_end_offset - committed_offset; sustained growth indicates consumer cannot keep up
- Idempotent producers (enable.idempotence=true) prevent duplicates per partition without transactions
- Partition assignment via CooperativeStickyAssignor minimizes rebalance disruption
- Manual offset commits after processing prevent data loss; auto-commit risks duplicates on failure
- Consumer throughput is bounded by min(poll_rate Γ records_per_poll, fetch_rate)
See also: Data Engineering Pipeline patterns (data-engineering/019) | PySpark Structured Streaming (pyspark/11-structured-streaming)