Kafka Streams Architecture
Difficulty: Senior Level | Companies: LinkedIn, Uber, Netflix, Spotify, Confluent
Content
Kafka Streams is a client library for building stream processing applications on top of Kafka. It provides stateful processing, windowing, and exactly-once semantics.
Architecture Overview
Architecture Diagram
Kafka Streams Application:
βββββββββββββββββββββββββββββββββββββββββββββββββββ
β Stream Thread 1 β
β βββββββββββββββ βββββββββββββββ β
β β Source β β Processor β β
β β (Consumer) ββββΆβ (Transform) β β
β βββββββββββββββ βββββββββββββββ β
β β β β
β βΌ βΌ β
β βββββββββββββββ βββββββββββββββ β
β β State Store β β Sink β β
β β (RocksDB) β β (Producer) β β
β βββββββββββββββ βββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββ
Processing Topology
Architecture Diagram
KStream (unbounded stream)
β
βββ Filter
βββ Map
βββ FlatMap
βββ WindowedAggregate
β
βΌ
KTable (changelog-backed state)
β
βββ Join
βββ Aggregate
β
βΌ
Output KStream
Basic Kafka Streams Application
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import java.util.Properties;
public class StreamProcessor {
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-processor");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.StringSerde.class);
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.StringSerde.class);
// Exactly-once semantics
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
StreamsConfig.EXACTLY_ONCE_V2);
StreamsBuilder builder = new StreamsBuilder();
// Source stream
KStream<String, String> orders = builder.stream("orders");
// Transform stream
KStream<String, String> processed = orders
.filter((key, value) -> value != null)
.mapValues(value -> value.toUpperCase())
.peek((key, value) -> System.out.println("Processing: " + key));
// Sink
processed.to("processed-orders");
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
// Graceful shutdown
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
Python Kafka Streams (Faust)
import faust
from faust import Topic, Record
# Faust app (Kafka Streams equivalent in Python)
app = faust.App(
'stream-processor',
broker='kafka://kafka1:9092,kafka2:9092',
processing_guarantee='exactly_once'
)
# Define topic
orders_topic = app.topic('orders', value_type=str)
# Define table (state store)
order_counts = app.Table(
'order-counts',
default=int,
partitions=4
)
@app.agent(orders_topic)
async def process_orders(stream):
async for order in stream:
# Process each order
processed = transform(order)
# Update state
order_counts[order['user_id']] += 1
# Send to output topic
await processed_orders.send(value=processed)
@app.task
async def on_start():
print("Stream processor started")
State Stores
RocksDB State Store
// Create state store for local state
KTable<String, OrderSummary> orderSummary = orders
.groupByKey()
.aggregate(
OrderSummary::new,
(key, order, summary) -> summary.addOrder(order),
Materialized.<String, OrderSummary, KeyValueStore<Bytes, byte[]>>as(
"order-summary-store"
)
.withKeySerde(Serdes.String())
.withValueSerde(new OrderSummarySerde())
.withRetention(Duration.ofHours(24))
);
Custom State Store
// Custom state store implementation
public class CustomStateStore implements StateStore {
private final String name;
private final Map<String, byte[]> store = new ConcurrentHashMap<>();
@Override
public String name() {
return name;
}
@Override
public void init(ProcessorContext context, StateStore root) {
// Initialize store
// Register flush listener
this.flushListener = root::flush;
}
@Override
public void flush() {
// Flush to persistent storage
}
@Override
public void close() {
// Cleanup resources
}
public byte[] get(String key) {
return store.get(key);
}
public void put(String key, byte[] value) {
store.put(key, value);
flushListener.run(); // Notify processor
}
}
Windowing Operations
Tumbling Windows
// Fixed-size, non-overlapping windows
KTable<Windowed<String>, Long> tumblingCounts = orders
.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
.count();
// Window boundaries:
// [00:00, 00:05), [00:05, 00:10), [00:10, 00:15), ...
Hopping Windows
// Fixed-size, overlapping windows
KTable<Windowed<String>, Long> hoppingCounts = orders
.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(10))
.advanceBy(Duration.ofMinutes(5)))
.count();
// Window boundaries:
// [00:00, 00:10), [00:05, 00:15), [00:10, 00:20), ...
Sliding Windows
// Dynamic windows based on event timestamps
KTable<Windowed<String>, Long> slidingCounts = orders
.groupByKey()
.windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5)))
.count();
Session Windows
// Dynamic windows based on activity
KTable<Windowed<String>, Long> sessionCounts = orders
.groupByKey()
.windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMinutes(10)))
.count();
// Windows merge if gap < inactivity gap
// Good for user activity tracking
Join Operations
KStream-KStream Join
// Join two streams on key
KStream<String, EnrichedOrder> enrichedOrders = orders
.join(
userProfiles,
(order, profile) -> new EnrichedOrder(order, profile),
Joined.with(Serdes.String(), new OrderSerde(), new ProfileSerde())
);
KStream-KTable Join
// Enrich stream with table lookup
KStream<String, EnrichedOrder> enrichedOrders = orders
.join(
orderSummaryTable,
(order, summary) -> new EnrichedOrder(order, summary),
Joined.with(Serdes.String(), new OrderSerde(), new SummarySerde())
);
Left Join
// Include records even if no match
KStream<String, EnrichedOrder> enrichedOrders = orders
.leftJoin(
userProfiles,
(order, profile) -> new EnrichedOrder(order, profile),
Joined.with(Serdes.String(), new OrderSerde(), new ProfileSerde())
);
βΉοΈ
Key Insight: Join windows are required for KStream-KStream joins. KStream-KTable joins are lookup-based and don't require windows.
Exactly-Once in Streams
// Enable exactly-once processing
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
StreamsConfig.EXACTLY_ONCE_V2);
// What happens:
// 1. Producer transactions enabled automatically
// 2. Consumer offsets committed within transactions
// 3. State store updates atomic with output
// 4. Changelog topics for fault tolerance
Error Handling
// Handle deserialization errors
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
DeserializationExceptionHandler.class);
// Custom exception handler
public class CustomExceptionHandler implements DeserializationExceptionHandler {
@Override
public DeserializationHandlerResponse handleDeserializationException(
ProcessorContext context,
DeserializationException exception,
byte[] record,
String topic) {
// Log error, send to dead letter topic
System.err.println("Deserialization failed: " + exception.getMessage());
// Option 1: Continue processing
return DeserializationHandlerResponse.CONTINUE;
// Option 2: Stop processing
// return DeserializationHandlerResponse.FAIL;
}
}
Follow-Up Questions
- What is the difference between KStream and KTable?
- How does Kafka Streams handle state store fault tolerance?
- Explain the difference between tumbling, hopping, and session windows.
- What happens during a rebalance in a Kafka Streams application?
- How does exactly-once semantics work in Kafka Streams?