Kafka Streams & Connect API: Stream Processing and Data Integration
Architecture Diagram: Kafka Streams Topology
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β β
β KAFKA STREAMS TOPOLOGY & PROCESSING ARCHITECTURE β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β β
β β SOURCE TOPICS PROCESSOR TOPOLOGY SINK TOPICS β β
β β βββββββββββββββββββ βββββββββββββββββββββββββββββββββββ ββββββββββββββββββββ β
β β β β β β β ββ β
β β β βββββββββββββ β ββββββββββββββββ β ββββββββββββββββ β β βββββββββββββ ββ β
β β β βraw-events β βββββΊβ SOURCE β β β β β β βenriched- β ββ β
β β β β (Part: 6) β β β KSTREAM β β β FILTER β β β βevents β ββ β
β β β βββββββββββββ β β (topic) β β β β β β β (Part: 6)β ββ β
β β β β ββββββββ¬ββββββββ β β filter(e -> β β β βββββββββββββ ββ β
β β β βββββββββββββ β β β β e.isValid) β β β ββ β
β β β βuser-data β β β β β β β β βββββββββββββ ββ β
β β β β (Part: 3) β β β β ββββββββ¬ββββββββ β β βalerts β ββ β
β β β βββββββββββββ β βΌ β β β β β (Part: 3)β ββ β
β β β β ββββββββββββββββ β βΌ β β βββββββββββββ ββ β
β β βββββββββββββββββββ β KSTREAM β β ββββββββββββββββ β β ββ β
β β β - MAP β β β β β ββββββββββββββββββββ β
β β β - FILTER β β β JOIN β β β β
β β β - FLATMAP β β β β β β β
β β β - AGGREGATE β β β kstream- β β β β
β β β - JOIN β β β ktable join β β β β
β β β β β β β β β β
β β β KTABLE β β ββββββββ¬ββββββββ β β β
β β β - FILTER β β β β β β
β β β - MAPVALUES β β βΌ β β β
β β β - JOIN β β ββββββββββββββββ β β β
β β β β β β β β β β
β β β KGROUPED β β β AGGREGATE β β β β
β β β - COUNT β β β β β β β
β β β - REDUCE β β β windowedBy β β β β
β β β - AGGREGATE β β β TimeWindows β β β β
β β β β β β β β β β
β β ββββββββββββββββ β ββββββββ¬ββββββββ β β β
β β β β β β β
β β β βΌ β β β
β β β ββββββββββββββββ β β β
β β β β TO ββββββββββββββββββΌβββββββΊ SINK TOPICS β β
β β β β (branch) β β β β
β β β ββββββββββββββββ β β β
β β β β β β
β β βββββββββββββββββββββββββββββββββββ β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β LOCAL STATE STORES (RocksDB) β β
β β β β
β β ββββββββββββββββββββββββ ββββββββββββββββββββββββ ββββββββββββββββββββββββ ββββββββββββββββββββββββ β β
β β β user-state-store β β window-agg-store β β session-store β β global-kv-store β β β
β β β ββββββββββββββββββ β β ββββββββββββββββββ β β ββββββββββββββββββ β β ββββββββββββββββββ β β β
β β β β Key: user_id β β β β Key: window_keyβ β β β Key: session_idβ β β β Key: any_key β β β β
β β β β Val: UserState β β β β Val: AggResult β β β β Val: Session β β β β Val: any_val β β β β
β β β β DB: RocksDB β β β β DB: RocksDB β β β β DB: RocksDB β β β β DB: In-Memory β β β β
β β β ββββββββββββββββββ β β ββββββββββββββββββ β β ββββββββββββββββββ β β ββββββββββββββββββ β β β
β β ββββββββββββββββββββββββ ββββββββββββββββββββββββ ββββββββββββββββββββββββ ββββββββββββββββββββββββ β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Architecture Diagram: Kafka Connect Framework
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β β
β KAFKA CONNECT FRAMEWORK ARCHITECTURE β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β β
β β EXTERNAL SYSTEMS CONNECT CLUSTER KAFKA CLUSTER β β
β β βββββββββββββββββββ βββββββββββββββββββββββββββββββββββββββ βββββββββββββββββββ β β
β β β β β β β β β β
β β β βββββββββββββ β ββββββββββββ β βββββββββββββββββββββββββββββββ β β βββββββββββββ β β β
β β β β MySQL β βββββΊβ SOURCE β β β REST API / CLI β β β β topic-1 β β β β
β β β β Database β β β Connectorβ β β POST /connectors β β β β (3 parts) β β β β
β β β βββββββββββββ β β β β β GET /connectors β β β βββββββββββββ β β β
β β β β β Task 0 β β β PUT /connectors β β β β β β
β β β βββββββββββββ β β Task 1 β β β DELETE /connectors β β β βββββββββββββ β β β
β β β β PostgreSQLβ βββββΊβ Task 2 β β βββββββββββββββββββββββββββββββ β β β topic-2 β β β β
β β β β Database β β β β β β β β (6 parts) β β β β
β β β βββββββββββββ β ββββββββββββ β βββββββββββββββββββββββββββββββ β β βββββββββββββ β β β
β β β β β β CONFIG PROVIDER β β β β β β
β β β βββββββββββββ β β β βββββββββββββββββββββββ β β β βββββββββββββ β β β
β β β β MongoDB β β β β β Vault β β β β β topic-3 β β β β
β β β β NoSQL β β β β β AWS Secrets Mgr β β β β β (3 parts) β β β β
β β β βββββββββββββ β β β β Azure Key Vault β β β β βββββββββββββ β β β
β β β β β β βββββββββββββββββββββββ β β β β β β
β β β βββββββββββββ β β βββββββββββββββββββββββββββββββ β βββββββββββββββββββ β β
β β β β Elasticsearchβ β β β β
β β β β (Search) β β β βββββββββββββββββββββββββββββββ β β β
β β β βββββββββββββ β β β CONVERTERS β β β β
β β β β β β βββββββββββββββββββββββ β β β β
β β β βββββββββββββ β β β β Avro Converter β β β β β
β β β β S3 Bucket β β β β β JSON Converter β β β β β
β β β β (Object) β β β β β Protobuf Convert β β β β β
β β β βββββββββββββ β β β βββββββββββββββββββββββ β β β β
β β β β β βββββββββββββββββββββββββββββββ β β β
β β βββββββββββββββββββ β β β β
β β β βββββββββββββββββββββββββββββββ β β β
β β βββββββββββββββββββ β β TRANSFORMS β β β β
β β β β β β βββββββββββββββββββββββ β β β β
β β β βββββββββββββ β ββββββββββββ β β β InsertField β β β β β
β β β β Files β ββββββ SINK β β β β ReplaceField β β β β β
β β β β (CSV/JSON)β β β Connectorβ β β β MaskField β β β β β
β β β βββββββββββββ β β β β β β TimestampRouter β β β β β
β β β β β Task 0 β β β β Flatten β β β β β
β β β βββββββββββββ β β Task 1 β β β βββββββββββββββββββββββ β β β β
β β β β JDBC β ββββββ β β βββββββββββββββββββββββββββββββ β β β
β β β β Target β β ββββββββββββ β β β β
β β β βββββββββββββ β β βββββββββββββββββββββββββββββββ β β β
β β β β β β DEAD LETTER QUEUE β β β β
β β β βββββββββββββ β β β βββββββββββββββββββββββ β β β β
β β β β HDFS β βββββ β β β errors-topic β β β β β
β β β β (Data Lake)β β β β β (for failed msgs) β β β β β
β β β βββββββββββββ β β β βββββββββββββββββββββββ β β β β
β β βββββββββββββββββββ β βββββββββββββββββββββββββββββββ β β β
β β β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Architecture Diagram: Stream Processing Patterns
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β β
β STREAM PROCESSING PATTERNS & STATE MANAGEMENT β
β β
β PATTERN 1: EVENT TIME WINDOWING β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β β
β β Input Stream: βββ€e1 (t=100)ββββ€e2 (t=150)ββββ€e3 (t=200)ββββ€e4 (t=250)ββββ€e5 (t=300)βββ β β
β β β β
β β Tumbling Window (size=100ms): β β
β β βββββββββββββββββββββ¬ββββββββββββββββββββ¬ββββββββββββββββββββ¬ββββββββββββββββββββ β β
β β β [0-100) β [100-200) β [200-300) β [300-400) β β β
β β β e1 (t=100) β e2 (t=150) β e3 (t=200) β e5 (t=300) β β β
β β β count=1 β count=1 β count=2 β count=1 β β β
β β βββββββββββββββββββββ΄ββββββββββββββββββββ΄ββββββββββββββββββββ΄ββββββββββββββββββββ β β
β β β β
β β Hopping Window (size=100ms, advance=50ms): β β
β β βββββββββββββββββββββ¬ββββββββββββββββββββ¬ββββββββββββββββββββ¬ββββββββββββββββββββ β β
β β β [0-100) β [50-150) β [100-200) β [150-250) β β β
β β β e1 β e1, e2 β e2, e3 β e3, e4 β β β
β β β count=1 β count=2 β count=2 β count=2 β β β
β β βββββββββββββββββββββ΄ββββββββββββββββββββ΄ββββββββββββββββββββ΄ββββββββββββββββββββ β β
β β β β
β β Session Window (gap=50ms): β β
β β βββββββββββββββββββββ¬ββββββββββββββββββββ¬ββββββββββββββββββββ β β
β β β Session 1 β Session 2 β Session 3 β β β
β β β e1 (t=100) β e2-e4 (150-250) β e5 (t=300) β β β
β β β count=1 β count=3 β count=1 β β β
β β βββββββββββββββββββββ΄ββββββββββββββββββββ΄ββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β PATTERN 2: KSTREAM-KTABLE JOIN β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β β
β β KStream (orders): KTable (users): Joined Result: β β
β β ββββββββββββββββββββ ββββββββββββββββββββ ββββββββββββββββββββ β β
β β β order-1: user-A β β user-A: {premium}β β order-1: { β β β
β β β order-2: user-B β β user-B: {basic} β β user: premium,β β β
β β β order-3: user-A β β user-C: {premium}β β discount: 10% β β β
β β ββββββββββββββββββββ ββββββββββββββββββββ β } β β β
β β β order-2: { β β β
β β β user: basic, β β β
β β β discount: 0% β β β
β β β } β β β
β β ββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β PATTERN 3: EXACTLY-ONCE STREAM PROCESSING β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β TRANSACTION BOUNDARY β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β 1. Read from source topic βββΊ position stored β β β β
β β β β 2. Process / Transform records βββΊ state updated in local store β β β β
β β β β 3. Write to sink topic βββΊ output produced β β β β
β β β β 4. Commit source offsets βββΊ offsets committed atomically β β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β β β
β β β At any point: transaction can be aborted, and all effects (writes + offsets) are rolled back. β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Formal Definitions
DfStream Processing Topology
A stream processing topology is a directed acyclic graph (DAG) of processors where nodes represent operations (source, transform, sink) and edges represent data flow. The topology defines how data moves from source topics through transformations to sink topics. Kafka Streams automatically partitions the topology across instances by assigning different input partitions to different instances.
DfWindowing
Windowing is the grouping of records by time into finite buckets for aggregation. Kafka Streams supports three window types: tumbling windows (fixed-size, non-overlapping), hopping windows (fixed-size, overlapping with configurable advance), and session windows (activity-based, variable-size with inactivity gap). Windowing enables time-bounded aggregations on unbounded streams.
DfKTable
A KTable is an abstraction of a changelog stream where each key has at most one value. It represents the latest state for each key, enabling point lookups and joins. Unlike KStream (append-only), KTable updates are upserts: a new value for a key replaces the previous one. KTable state is backed by a local RocksDB store with a changelog topic for fault tolerance.
Key Formulas
Window Aggregation (Tumbling)
Here,
- =Tumbling window [t_i, t_i + s)
- =Window size (e.g., 5 minutes)
- =Aggregation function applied to each event e
- =Aggregation operator (count, sum, reduce, etc.)
Window Aggregation (Hopping)
Here,
- =Window size
- =Advance interval (a < s for overlap)
- =Window start time, t_i = t_0 + i \cdot a
Kafka Streams uses event-time windowing by default (via WallclockTimestampExtractor or CustomTimestampExtractor). Watermarks track progress of event time; late-arriving data beyond the grace period is dropped. Configure grace period with TimeWindows.ofSizeWithNoGrace() or TimeWindows.ofSizeAndGrace().
For exactly-once stream processing, set processing.guarantee=exactly_once_v2. This automatically configures idempotent producers, transactional writes, and read_committed consumers within the Streams library. EOS v2 reduces latency by ~30% compared to v1.
Detailed Explanation
Kafka Streams is a client library for building stream processing applications on top of Kafka. Unlike traditional stream processing frameworks (Apache Flink, Apache Spark Streaming), Kafka Streams runs within the application process, requiring no separate cluster management. This embedded architecture simplifies deployment and scaling, as each instance of the application is a stream processor that participates in the consumer group for the input topics. The library provides both a high-level DSL (Domain Specific Language) for common operations and a Processor API for custom processing logic.
Topology and Processor DAG: A Kafka Streams application defines a processing topology as a directed acyclic graph (DAG) of processors. Each processor is either a source (reads from topics), a transformation (map, filter, aggregate), or a sink (writes to topics). The topology is automatically distributed across instances by assigning different partitions to different instances. The StreamsBuilder DSL automatically constructs this topology, while the Processor API allows building custom processor nodes with fine-grained control over state management and scheduling.
Stateful Operations: Kafka Streams supports several stateful operations that require local state storage. These include aggregations (count, reduce, aggregate), joins (KStream-KTable, KStream-KStream, KTable-KTable), and windowing (tumbling, hopping, session windows). State is stored in a local RocksDB instance, backed by a changelog topic in Kafka for fault tolerance. The changelog topic ensures that state can be rebuilt from Kafka if a node fails. The state.dir configuration determines where local state is stored, and num.rocksdb.background.threads controls background compaction.
Windowing Strategies: Event-time windowing groups records by their event timestamp rather than processing time. Kafka Streams supports tumbling windows (fixed-size, non-overlapping), hopping windows (fixed-size, overlapping), and session windows (activity-based, variable-size). Watermarks track the progress of event time, allowing the system to handle late-arriving data. The cache.max.bytes.buffering setting controls memory usage for window state, and commit.interval.ms determines how often state is flushed to the changelog topic.
Kafka Connect provides a framework for moving data between Kafka and external systems. Connectors are plugins that implement either SourceConnector (reading from external systems and writing to Kafka) or SinkConnector (reading from Kafka and writing to external systems) interfaces. The Connect framework handles offset management, fault tolerance, and parallelism. Each connector instance runs one or more tasks, and the Connect framework distributes tasks across worker nodes. Connectors can be standalone (single JVM) or distributed (cluster of workers), with the distributed mode providing automatic scaling and fault tolerance.
Connect Configuration and transforms: Connectors are configured via JSON configuration files or REST API. The framework supports Single Message Transforms (SMTs) for in-flight record transformation, converters for serialization format handling, and dead letter queues for error handling. SMTs are applied in order and can modify records before they reach the converter. Common SMTs include InsertField, ReplaceField, MaskField, TimestampRouter, and Flatten. Converters handle serialization between Kafka's internal representation and the external format (JSON, Avro, Protobuf).
Exactly-Once Semantics in Connect: Kafka Connect supports exactly-once semantics through the exactly.once.support configuration. When enabled, Sink connectors must implement exactly-once semantics in their put() and preCommit() methods. Source connectors must implement exactly-once support by providing a transactional ID and implementing the Transaction API. This ensures that records are not duplicated during task rebalancing or connector restarts.
Key Concepts Table
| Component | Description | Key Configurations | Use Cases |
|---|---|---|---|
| KStream | Unbounded stream of records | processing.guarantee | Event processing, real-time analytics |
| KTable | Changelog stream (latest value per key) | state.dir, cache.max.bytes.buffering | Materialized views, join tables |
| GlobalKTable | Full copy on every instance | num.stream.threads | Reference data, dimension tables |
| Topology | DAG of processors | application.id, client.id | Processing pipeline definition |
| State Store | Local RocksDB persistence | rocksdb.block.cache.size | Stateful operations, windowing |
| Changelog Topic | Fault-tolerant state backup | changelog.config | State recovery, fault tolerance |
| Source Connector | Read from external systems | tasks.max, connector.class | CDC, file ingestion |
| Sink Connector | Write to external systems | topics, tasks.max | Data export, search indexing |
| Converter | Serialization format handling | key.converter, value.converter | JSON, Avro, Protobuf |
| SMT | Single Message Transform | transforms, transforms.* | Record modification, routing |
Code Examples
Kafka Streams DSL Application
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import java.time.Duration;
import java.util.Properties;
public class OrderProcessingStream {
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "order-processing-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
// Exactly-once semantics
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
StreamsConfig.EXACTLY_ONCE_V2);
// State store configuration
props.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams");
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024 * 1024L);
// Thread and task configuration
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4);
StreamsBuilder builder = new StreamsBuilder();
// Source: Raw order events
KStream<String, String> orders = builder.stream("raw-orders",
Consumed.with(Serdes.String(), Serdes.String())
.withTimestampExtractor(new WallclockTimestampExtractor()));
// Source: User data (KTable for joins)
KTable<String, String> users = builder.table("user-data",
Consumed.with(Serdes.String(), Serdes.String()));
// Transform and enrich
KStream<String, Order> enrichedOrders = orders
.filter((key, value) -> value != null && !value.isEmpty())
.mapValues(value -> parseOrder(value))
.filter((key, order) -> order.getAmount() > 0)
.join(users,
(order, user) -> order.withUser(user),
Joined.with(Serdes.String(), Serdes.String(), Serdes.String()));
// Branch by order type
Map<String, KStream<String, Order>> branches = enrichedOrders
.branch((key, order) -> order.getType().equals("PREMIUM"),
Branched.withFunction((stream) -> {
stream.mapValues(o -> o.applyDiscount(0.10))
.to("premium-orders", Produced.with(Serdes.String(), orderSerde));
return stream;
}, "premium"))
.branch((key, order) -> order.getType().equals("STANDARD"),
Branched.withFunction((stream) -> {
stream.mapValues(o -> o.applyDiscount(0.0))
.to("standard-orders", Produced.with(Serdes.String(), orderSerde));
return stream;
}, "standard"));
// Aggregation: Count orders per user in 1-hour tumbling windows
KTable<Windowed<String>, Long> orderCounts = enrichedOrders
.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofHours(1)))
.count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as(
"order-count-store")
.withValueSerde(Serdes.Long()));
// Aggregate: Total revenue per user in 5-minute hopping windows
KTable<Windowed<String>, Double> revenueByUser = enrichedOrders
.mapValues(order -> order.getAmount())
.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5))
.advanceBy(Duration.ofMinutes(1)))
.reduce(Double::sum, Materialized.<String, Double, WindowStore<Bytes, byte[]>>as(
"revenue-store")
.withValueSerde(Serdes.Double()));
// Session windowing for activity tracking
KTable<Windowed<String>, Long> sessionCounts = enrichedOrders
.map((key, order) -> KeyValue.pair(order.getUserId(), order))
.groupByKey()
.windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMinutes(30)))
.count(Materialized.<String, Long, SessionStore<Bytes, byte[]>>as(
"session-store")
.withValueSerde(Serdes.Long()));
// Sink: Write enriched orders
enrichedOrders
.mapValues(order -> order.toJson())
.to("enriched-orders", Produced.with(Serdes.String(), Serdes.String()));
StreamsTopology topology = builder.build();
System.out.println(topology.describe());
KafkaStreams streams = new KafkaStreams(topology, props);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
private static Order parseOrder(String json) {
// JSON parsing implementation
return new Order(); // Placeholder
}
}
Kafka Connect Source Connector Configuration
{
"name": "mysql-cdc-source",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "mysql-primary.example.com",
"database.port": "3306",
"database.user": "debezium",
"database.password": "${file:/opt/kafka/secrets/mysql.properties:password}",
"database.server.id": "184054",
"database.server.name": "ecommerce",
"database.include.list": "ecommerce",
"table.include.list": "ecommerce.orders,ecommerce.users,ecommerce.products",
"topic.prefix": "cdc",
"topic.creation.default.replication.factor": 3,
"topic.creation.default.partitions": 12,
"topic.creation.default.cleanup.policy": "delete",
"topic.creation.default.retention.ms": 604800000,
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"transforms": "route,unwrap,addTimestamp",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "ecommerce\\.(.*)",
"transforms.route.replacement": "cdc-$1",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.add.fields": "op,ts_ms,source.ts_ms",
"transforms.addTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.addTimestamp.timestamp.field": "processed_at",
"heartbeat.interval.ms": "30000",
"poll.interval.ms": "1000",
"max.batch.size": "8192",
"snapshot.mode": "initial",
"snapshot.locking.mode": "none",
"snapshot.select.statement.overrides": "ecommerce.orders",
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "dlq-mysql-cdc",
"errors.deadletterqueue.topic.replication.factor": 3,
"errors.log.enable": true,
"errors.log.include.messages": true,
"tasks.max": "3"
}
}
Kafka Connect Sink Connector Configuration
{
"name": "elasticsearch-sink",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"connection.url": "http://elasticsearch:9200",
"connection.username": "${file:/opt/kafka/secrets/es.properties:username}",
"connection.password": "${file:/opt/kafka/secrets/es.properties:password}",
"topics": "enriched-orders,enriched-users,enriched-products",
"key.ignore": false,
"schema.ignore": false,
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"transforms": "extractKey,indexByDate,typeRoute",
"transforms.extractKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractKey.field": "order_id",
"transforms.indexByDate.type": "org.apache.kafka.connect.transforms.TimestampRouter",
"transforms.indexByDate.timestamp.format": "YYYY-MM-dd",
"transforms.indexByDate.topic.format": "${topic}-$${timestamp}",
"transforms.typeRoute.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.typeRoute.regex": "enriched-(.*)",
"transforms.typeRoute.replacement": "kafka-$1",
"batch.size": 200,
"max.buffered.records": 500,
"linger.ms": 1,
"flush.timeout.ms": 10000,
"max.retries": 5,
"retry.backoff.ms": 100,
"behavior.on.null.values": "delete",
"behavior.on.malformed.documents": "warn",
"write.method": "upsert",
"document.id.strategy": "record_key",
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "dlq-elasticsearch",
"errors.deadletterqueue.topic.replication.factor": 3,
"tasks.max": "3"
}
}
Kafka Streams with Processor API (Custom Processor)
import org.apache.kafka.streams.processor.*;
import org.apache.kafka.streams.state.*;
import java.time.Duration;
import java.util.*;
public class FraudDetectionProcessor implements Processor<String, OrderEvent, String, Alert> {
private ProcessorContext<String, Alert> context;
private KeyValueStore<String, UserTransactionSummary> stateStore;
private WindowStore<String, OrderEvent> recentOrders;
@Override
public void init(ProcessorContext<String, Alert> context) {
this.context = context;
this.stateStore = context.getStateStore("user-transaction-summary");
this.recentOrders = context.getStateStore("recent-orders-window");
// Schedule punctuations
context.schedule(Duration.ofMinutes(5),
PunctuationType.WALL_CLOCK_TIME,
this::cleanupOldOrders);
context.schedule(Duration.ofMinutes(1),
PunctuationType.WALL_CLOCK_TIME,
this::checkVelocity);
}
@Override
public void process(Record<String, OrderEvent> record) {
String userId = record.value().getUserId();
OrderEvent event = record.value();
// Store in window store for velocity checks
recentOrders.put(record.key(), event,
Instant.ofEpochMilli(event.getTimestamp()));
// Get or initialize user summary
UserTransactionSummary summary = stateStore.get(userId);
if (summary == null) {
summary = new UserTransactionSummary(userId);
}
// Fraud detection logic
Alert alert = null;
// Rule 1: Amount threshold
if (event.getAmount() > summary.getAverageAmount() * 3) {
alert = new Alert(userId, "HIGH_AMOUNT",
"Transaction amount 3x above average", event);
}
// Rule 2: Velocity check (too many transactions)
long recentCount = countRecentTransactions(userId, Duration.ofMinutes(5));
if (recentCount > summary.getTransactionsPerMinute() * 5) {
alert = new Alert(userId, "HIGH_VELOCITY",
"Transaction velocity 5x above normal", event);
}
// Rule 3: Geographic anomaly
if (summary.getLastLocation() != null &&
!summary.getLastLocation().equals(event.getLocation())) {
alert = new Alert(userId, "GEO_ANOMALY",
"Location changed from " + summary.getLastLocation() +
" to " + event.getLocation(), event);
}
// Update summary
summary.update(event);
stateStore.put(userId, summary);
// Forward alert if fraud detected
if (alert != null) {
context.forward(new Record<>(userId, alert, record.timestamp()));
}
}
private long countRecentTransactions(String userId, Duration window) {
Instant from = Instant.now().minus(window);
Instant to = Instant.now();
long count = 0;
try (KeyValueIterator<Windowed<String>, OrderEvent> iterator =
recentOrders.fetch(userId, from, to)) {
while (iterator.hasNext()) {
count++;
iterator.next();
}
}
return count;
}
private void cleanupOldOrders(Instant timestamp) {
Instant cutoff = timestamp.minus(Duration.ofHours(1));
try (KeyValueIterator<Windowed<String>, OrderEvent> iterator =
recentOrders.all()) {
while (iterator.hasNext()) {
KeyValue<Windowed<String>, OrderEvent> entry = iterator.next();
if (entry.key.window().endTime().isBefore(cutoff)) {
recentOrders.delete(entry.key);
}
}
}
}
private void checkVelocity(Instant timestamp) {
// Periodic velocity check logic
}
@Override
public void close() {}
}
Performance Metrics
| Metric | Kafka Streams (single instance) | Kafka Streams (4 instances) | Kafka Connect (distributed) |
|---|---|---|---|
| Throughput (msg/sec) | 100K | 400K | 300K |
| Latency (p99) | 10ms | 20ms | 50ms |
| State Store Size | 1GB | 1GB per instance | N/A |
| Memory (JVM Heap) | 4GB | 4GB per instance | 8GB per worker |
| CPU Usage | 40% | 30% per instance | 50% per worker |
| Changelog Topic Throughput | 50K msg/sec | 200K msg/sec | N/A |
| Rebalance Time | 30s | 60s | 120s |
| Startup Time | 10s | 15s | 30s |
Best Practices
-
Application ID and Instance Management: Use unique
application.idper stream processing application. Each instance gets a uniqueclient.idfor monitoring. Scale by adding more instances to the same consumer group. -
State Store Optimization: Configure
cache.max.bytes.bufferingto balance memory usage vs. write amplification. Enable RocksDB bloom filters (rocksdb.block.cache.size) for faster lookups. Userocksdb.write.buffer.sizeto tune write performance. -
Windowing Configuration: Choose window type based on use case: tumbling for fixed intervals, hopping for overlapping analysis, session for activity tracking. Configure grace periods to handle late events. Use
TimeWindows.ofSizeWithNoGrace()when you want to reject late events. -
Exactly-Once Semantics: Enable
processing.guarantee=exactly_once_v2for critical applications. Understand that EOS increases latency and reduces throughput. Use idempotent sink operations and transactional producers. -
Connect Task Scaling: Set
tasks.maxbased on the number of partitions for source connectors and external system capacity. Monitor connector health via REST API. Use dead letter queues for error handling instead of failing tasks. -
Converter Selection: Use Avro with Schema Registry for schema evolution and compatibility. JSON converters are simpler but lack schema enforcement. Protobuf is efficient but requires additional tooling.
-
SMT Ordering: Apply SMTs in the correct order (e.g., route before transform). Limit the number of SMTs to avoid performance overhead. Use custom SMTs only when built-in options are insufficient.
-
Monitoring: Track
commit-latency-avg,poll-latency-avg,process-latency-avgfor Streams. Trackconnector-failed-task-count,connector-total-task-countfor Connect. Use Confluent Control Center or JMX metrics. -
Error Handling: Use
errors.tolerance=allwith dead letter queues for non-critical failures. Implement circuit breakers for external system dependencies. Monitor DLQ size to detect systemic issues. -
Testing: Use
TopologyTestDriverfor unit testing Kafka Streams topologies. Use Testcontainers for integration testing with real Kafka and external systems. Test with realistic data volumes and edge cases.
Key Takeaways:
- Kafka Streams runs embedded in the application; no separate cluster needed β scale by adding instances to the same consumer group
- End-to-end latency = produce + poll + process + commit; optimize each component for real-time requirements
- Tumbling windows partition time into non-overlapping buckets; hopping windows allow overlap; session windows are activity-based
- KTable provides latest-value-per-key semantics for joins and materialized views
- Exactly-once via processing.guarantee=exactly_once_v2 handles idempotent production + transactional offsets automatically
- Kafka Connect handles offset management and parallelism; set tasks.max based on source partitions or sink capacity
See also: Data Engineering Pipeline patterns (data-engineering/019) | PySpark Structured Streaming (pyspark/11-structured-streaming)