πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Exactly-Once with Kafka Streams

Apache KafkaExactly-Once Streams⭐ Premium

Advertisement

Exactly-Once with Kafka Streams

Difficulty: Senior Level | Companies: LinkedIn, Uber, Netflix, Spotify, Confluent

Content

Exactly-once semantics in Kafka Streams ensures each record is processed exactly once, even across failures. This is achieved through producer transactions and consumer offset commits.

Exactly-Once Architecture

Architecture Diagram
Processing Guarantee Levels:
β”œβ”€β”€ at_most_once (default before 2.4)
β”‚   └── May lose records on failure
β”œβ”€β”€ at_least_once
β”‚   └── May duplicate records on failure
└── exactly_once_v2 (Kafka Streams 2.4+)
    └── Each record processed exactly once

How it works:
1. Consumer reads records
2. Producer writes to output topic
3. Consumer offsets committed
4. All in single transaction (atomic)

Transaction Flow

Architecture Diagram
1. Streams application starts transaction
2. Consumer polls records
3. Processor transforms records
4. Producer sends to output topic
5. Consumer offsets recorded
6. Transaction commits atomically
7. All-or-nothing guarantee

Java Kafka Streams Exactly-Once

import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;

import java.util.Properties;

public class ExactlyOnceStreamProcessor {
    
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "exactly-once-processor");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
                  Serdes.StringSerde.class);
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
                  Serdes.StringSerde.class);
        
        // Critical: Enable exactly-once processing
        props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
                  StreamsConfig.EXACTLY_ONCE_V2);
        
        // Transaction timeout (default 60s)
        props.put(StreamsConfig.producerPrefix(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG), 
                  60000);
        
        StreamsBuilder builder = new StreamsBuilder();
        
        // Source stream
        KStream<String, String> orders = builder.stream("input-orders");
        
        // Process with exactly-once
        KStream<String, String> processed = orders
            .filter((key, value) -> value != null)
            .mapValues(value -> enrichOrder(value))
            .peek((key, value) -> System.out.println("Processed: " + key));
        
        // Sink to output topic
        processed.to("output-orders");
        
        // Optional: Update state store (also within transaction)
        KTable<String, Long> orderCounts = orders
            .groupByKey()
            .count();
        
        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();
        
        // Graceful shutdown
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            streams.close(Duration.ofSeconds(30));
        }));
    }
    
    private static String enrichOrder(String order) {
        // Transformation logic
        return order.toUpperCase();
    }
}

Python Kafka Streams (Faust)

import faust
from faust import Topic, Record
from typing import Optional
import json

# Faust app with exactly-once
app = faust.App(
    'exactly-once-processor',
    broker='kafka://kafka1:9092,kafka2:9092',
    processing_guarantee='exactly_once',
    broker_max_poll_interval=300,
    topic_partitions=4
)

# Topics
input_topic = app.topic('input-orders', value_type=str)
output_topic = app.topic('output-orders', value_type=str)

# State store (table)
order_counts = app.Table(
    'order-counts',
    default=int,
    partitions=4
)

@app.agent(input_topic)
async def process_orders(stream):
    """Process orders with exactly-once semantics"""
    async for order in stream:
        try:
            # Transform
            processed = transform_order(order)
            
            # Send to output topic
            await output_topic.send(value=processed)
            
            # Update state store
            user_id = extract_user_id(order)
            order_counts[user_id] += 1
            
            # Processing complete - will be committed atomically
            
        except Exception as e:
            print(f"Error processing order: {e}")
            # Faust will handle retry/backoff

def transform_order(order):
    """Transform order data"""
    return f"processed:{order}"

def extract_user_id(order):
    """Extract user ID from order"""
    # Your extraction logic
    return "user-123"

if __name__ == '__main__':
    app.main()

Transaction Management

Producer Transactions

// Understanding transaction internals
Properties props = new Properties();
props.put("transactional.id", "streams-processor-1");  // Unique per instance
props.put("enable.idempotence", true);
props.put("acks", "all");
props.put("retries", Integer.MAX_VALUE);

Producer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions();

try {
    producer.beginTransaction();
    
    // Send records
    producer.send(new ProducerRecord<>("output", "key", "value"));
    
    // Commit consumer offsets within transaction
    Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
    offsets.put(new TopicPartition("input", 0), new OffsetAndMetadata(100));
    producer.sendOffsetsToTransaction(offsets, "consumer-group");
    
    producer.commitTransaction();
} catch (Exception e) {
    producer.abortTransaction();
}

Consumer Offset Management

// Consumer with exactly-once
Properties props = new Properties();
props.put("group.id", "exactly-once-consumer");
props.put("enable.auto.commit", false);
props.put("isolation.level", "read_committed");

KafkaConsumer<String, String> consumer = new KafkaProducer<>(props);
consumer.subscribe(Arrays.asList("input-topic"));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    
    for (ConsumerRecord<String, String> record : records) {
        // Process record
        processRecord(record);
    }
    
    // Don't commit here - will be done in transaction
}

State Store Exactly-Once

// State store updates are atomic with output
KTable<String, OrderSummary> summary = orders
    .groupByKey()
    .aggregate(
        OrderSummary::new,
        (key, order, summary) -> summary.addOrder(order),
        Materialized.<String, OrderSummary, KeyValueStore<Bytes, byte[]>>as(
            "order-summary-store"
        )
    );

// State store is backed by changelog topic
// Changelog is written within the same transaction
// Recovery replays changelog on failure

State Store Recovery

Architecture Diagram
Failure Scenario:
1. Processor reads 100 records
2. Writes 50 to state store
3. Crashes before commit

Recovery:
1. Consumer restarts from last committed offset
2. State store restored from changelog topic
3. Re-processes all 100 records
4. State store idempotently updated
5. Transaction commits successfully

Exactly-Once Configuration

# Application configuration
application.id=exactly-once-app
bootstrap.servers=kafka1:9092,kafka2:9092

# Processing guarantee
processing.guarantee=exactly_once_v2

# Producer settings (automatically configured)
acks=all
enable.idempotence=true
retries=2147483647

# Transaction timeout
transaction.timeout.ms=60000

# Consumer settings
isolation.level=read_committed
enable.auto.commit=false

Monitoring Exactly-Once

from prometheus_client import Counter, Gauge, Histogram

# Metrics
processed_records = Counter(
    'kafka_streams_processed_records_total',
    'Total processed records'
)

transaction_commits = Counter(
    'kafka_streams_transaction_commits_total',
    'Total transaction commits'
)

transaction_aborts = Counter(
    'kafka_streams_transaction_aborts_total',
    'Total transaction aborts'
)

processing_lag = Gauge(
    'kafka_streams_processing_lag',
    'Processing lag in records'
)

processing_latency = Histogram(
    'kafka_streams_processing_latency_seconds',
    'Processing latency',
    buckets=[0.001, 0.01, 0.1, 1.0]
)

def monitor_streams_application(app):
    """Monitor Kafka Streams application"""
    while True:
        # Get state
        state = app.state
        
        # Update metrics
        processed_records.inc(state.processed_count)
        transaction_commits.inc(state.commit_count)
        transaction_aborts.inc(state.abort_count)
        
        # Calculate lag
        lag = state.end_offset - state.committed_offset
        processing_lag.set(lag)
        
        time.sleep(10)

Performance Considerations

Architecture Diagram
Exactly-Once Overhead:
β”œβ”€β”€ Producer transactions add latency
β”œβ”€β”€ Multiple partitions increase commit time
β”œβ”€β”€ State store updates require changelog writes
└── Network overhead for transaction coordination

Optimization Strategies:
β”œβ”€β”€ Batch records within transaction
β”œβ”€β”€ Optimize state store (RocksDB tuning)
β”œβ”€β”€ Reduce transaction timeout if possible
β”œβ”€β”€ Use cooperative rebalancing
└── Monitor and tune commit intervals

Common Pitfalls

1. Transaction Timeout Too Short

// Problem: Long-running operations exceed timeout
props.put("transaction.timeout.ms", "10000");  // 10 seconds

// Solution: Increase timeout for long processing
props.put("transaction.timeout.ms", "300000");  // 5 minutes

2. Consumer Rebalances During Transaction

Architecture Diagram
Problem:
1. Transaction starts
2. Rebalance occurs
3. Transaction aborted
4. Records reprocessed

Solution:
- Use static group membership
- Implement proper rebalance listener
- Handle partial transaction recovery

3. State Store Corruption

Architecture Diagram
Problem: State store inconsistent after crash

Solution:
- Enable changelog topics
- Use idempotent updates
- Implement proper recovery logic

Follow-Up Questions

  1. What is the difference between exactly_once and exactly_once_v2?
  2. How does Kafka Streams handle transaction timeouts?
  3. Explain the role of changelog topics in exactly-once processing.
  4. What happens when a rebalance occurs during an active transaction?
  5. How would you troubleshoot exactly-once processing failures?

Advertisement