Kafka Performance Tuning
Difficulty: Senior Level | Companies: LinkedIn, Uber, Netflix, Spotify, Confluent
Content
Performance tuning is critical for achieving high throughput and low latency in Kafka. Understanding the trade-offs between these two goals is essential.
Performance Dimensions
Architecture Diagram
Performance Dimensions:
βββ Throughput (messages/sec or bytes/sec)
βββ Latency (time from produce to consume)
βββ Durability (data safety guarantees)
βββ Availability (uptime requirements)
Trade-offs:
βββ Higher throughput β Higher latency
βββ Stronger durability β Lower throughput
βββ Higher availability β More resources
Producer Tuning
Batching Configuration
# Producer batching settings
batch.size=16384 # 16KB default batch size
linger.ms=5 # Wait 5ms to fill batch
buffer.memory=33554432 # 32MB buffer
max.in.flight.requests.per.connection=5
# Compression
compression.type=lz4 # or snappy, gzip, zstd
# Batch size calculation:
# Throughput = batch.size * (1000 / linger.ms) * num_partitions
# Example: 16384 * (1000/5) * 10 = 32,768,000 bytes/sec β 31 MB/s
Java Producer Configuration
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092,kafka2:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// Throughput optimization
props.put("batch.size", 65536); # 64KB batches
props.put("linger.ms", 10); # Wait 10ms to fill batch
props.put("buffer.memory", 67108864); # 64MB buffer
props.put("compression.type", "lz4");
props.put("max.in.flight.requests.per.connection", 5);
// Reliability (can trade for throughput)
props.put("acks", "all");
props.put("retries", Integer.MAX_VALUE);
props.put("enable.idempotence", true);
Producer<String, String> producer = new KafkaProducer<>(props);
// Benchmark producer
long startTime = System.currentTimeMillis();
int messageCount = 1000000;
for (int i = 0; i < messageCount; i++) {
producer.send(new ProducerRecord<>("benchmark", "key-" + i, "value-" + i));
}
producer.flush();
long endTime = System.currentTimeMillis();
double throughput = (double) messageCount / ((endTime - startTime) / 1000.0);
System.out.printf("Throughput: %.2f messages/sec%n", throughput);
Python Producer Optimization
from kafka import KafkaProducer
import time
producer = KafkaProducer(
bootstrap_servers=['kafka1:9092', 'kafka2:9092'],
batch_size=65536, # 64KB batches
linger_ms=10, # Wait 10ms
buffer_memory=67108864, # 64MB buffer
compression_type='lz4',
acks='all',
retries=2147483647,
max_in_flight_requests_per_connection=5
)
# Benchmark
start_time = time.time()
message_count = 1000000
for i in range(message_count):
producer.send(
topic='benchmark',
key=f'key-{i}'.encode('utf-8'),
value=f'value-{i}'.encode('utf-8')
)
producer.flush()
end_time = time.time()
throughput = message_count / (end_time - start_time)
print(f"Throughput: {throughput:.2f} messages/sec")
Consumer Tuning
Fetch Configuration
# Consumer fetch settings
fetch.min.bytes=1 # Wait for at least 1 byte
fetch.max.wait.ms=500 # Max wait time
max.partition.fetch.bytes=1048576 # 1MB per partition
fetch.max.bytes=52428800 # 50MB total fetch
# Session settings
session.timeout.ms=30000
heartbeat.interval.ms=10000
max.poll.interval.ms=300000
max.poll.records=500 # Records per poll
Java Consumer Optimization
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092,kafka2:9092");
props.put("group.id", "performance-consumer");
props.put("enable.auto.commit", false);
// Fetch optimization
props.put("fetch.min.bytes", 1);
props.put("fetch.max.wait.ms", 500);
props.put("max.partition.fetch.bytes", 1048576); // 1MB
props.put("max.poll.records", 500);
// Session optimization
props.put("session.timeout.ms", 30000);
props.put("heartbeat.interval.ms", 10000);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("benchmark"));
// Benchmark consumer
long startTime = System.currentTimeMillis();
long totalMessages = 0;
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
if (records.count() == 0) continue;
totalMessages += records.count();
// Process records
for (ConsumerRecord<String, String> record : records) {
processRecord(record);
}
// Commit offsets
consumer.commitSync();
// Check if benchmark complete
if (totalMessages >= 1000000) break;
}
long endTime = System.currentTimeMillis();
double throughput = (double) totalMessages / ((endTime - startTime) / 1000.0);
System.out.printf("Consumer throughput: %.2f messages/sec%n", throughput);
Broker Tuning
Server Configuration
# Broker performance settings
num.network.threads=8
num.io.threads=16
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
# Log configuration
log.retention.hours=168
log.segment.bytes=1073741824 # 1GB segments
log.retention.check.interval.ms=300000
log.cleaner.enable=true
# Replication
replica.fetch.wait.max.ms=500
replica.fetch.min.bytes=1
replica.fetch.max.bytes=10485760
num.replica.fetchers=4
JVM Tuning
# Kafka JVM settings
export KAFKA_HEAP_OPTS="-Xmx4g -Xms4g"
export KAFKA_JVM_PERFORMANCE_OPTS="
-server
-XX:+UseG1GC
-XX:MaxGCPauseMillis=20
-XX:InitiatingHeapOccupancyPercent=35
-XX:+ExplicitGCInvokesConcurrent
-Djava.awt.headless=true
"
Performance Benchmarks
Throughput Test
import time
from kafka import KafkaProducer, KafkaConsumer
from concurrent.futures import ThreadPoolExecutor
class KafkaBenchmark:
def __init__(self, bootstrap_servers):
self.bootstrap_servers = bootstrap_servers
def producer_throughput(self, topic, message_count, batch_size=16384):
producer = KafkaProducer(
bootstrap_servers=self.bootstrap_servers,
batch_size=batch_size,
linger_ms=5,
compression_type='lz4'
)
start = time.time()
for i in range(message_count):
producer.send(topic, value=f'message-{i}'.encode('utf-8'))
producer.flush()
end = time.time()
throughput = message_count / (end - start)
print(f"Producer throughput: {throughput:.2f} msg/sec")
return throughput
def consumer_throughput(self, topic, consumer_group, duration=60):
consumer = KafkaConsumer(
topic,
bootstrap_servers=self.bootstrap_servers,
group_id=consumer_group,
auto_offset_reset='earliest'
)
start = time.time()
total_messages = 0
while time.time() - start < duration:
records = consumer.poll(timeout_ms=1000)
total_messages += sum(len(records[tp]) for tp in records)
consumer.close()
throughput = total_messages / duration
print(f"Consumer throughput: {throughput:.2f} msg/sec")
return throughput
# Run benchmark
benchmark = KafkaBenchmark(['kafka1:9092'])
benchmark.producer_throughput('benchmark', 1000000)
benchmark.consumer_throughput('benchmark', 'benchmark-group')
Performance Monitoring
from prometheus_client import Gauge, Histogram, Counter
# Performance metrics
producer_throughput = Gauge(
'kafka_producer_throughput',
'Producer throughput in messages/sec'
)
consumer_throughput = Gauge(
'kafka_consumer_throughput',
'Consumer throughput in messages/sec'
)
message_latency = Histogram(
'kafka_message_latency_seconds',
'Message latency from produce to consume',
buckets=[0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0]
)
batch_size = Gauge(
'kafka_batch_size',
'Average batch size in bytes'
)
def measure_latency(producer, topic, key, value, consumer):
"""Measure end-to-end latency"""
start = time.time()
# Produce
producer.send(topic, key=key, value=value)
producer.flush()
# Consume
while True:
records = consumer.poll(timeout_ms=100)
for tp, messages in records.items():
for message in messages:
if message.key == key:
latency = time.time() - start
message_latency.observe(latency)
return latency
Performance Optimization Checklist
Architecture Diagram
Producer Optimization:
β Increase batch.size (16KB β 64KB or higher)
β Set linger.ms (5-10ms for throughput)
β Enable compression (lz4 for speed, zstd for ratio)
β Increase buffer.memory (32MB β 64MB)
β Use idempotent producers (enable.idempotence=true)
Consumer Optimization:
β Increase max.poll.records (100 β 500)
β Set fetch.min.bytes (1 for low latency, higher for throughput)
β Optimize max.partition.fetch.bytes (1MB default)
β Use cooperative rebalancing (CooperativeStickyAssignor)
Broker Optimization:
β Increase num.network.threads (4-8)
β Increase num.io.threads (8-16)
β Optimize socket buffers (128KB+)
β Use G1GC with appropriate heap size
β Enable log compaction for appropriate topics
βΉοΈ
Key Insight: Always benchmark with realistic data and workloads. Theoretical maximums may not match real-world performance due to network, disk, and application overhead.
Follow-Up Questions
- What is the trade-off between
batch.sizeandlinger.ms? - How does compression affect CPU usage and throughput?
- Explain how
fetch.min.bytesimpacts consumer latency. - What JVM settings would you use for a Kafka broker with 64GB RAM?
- How would you diagnose and fix a sudden drop in producer throughput?