Kafka Connect: Exactly-Once, Single Message Transform
Difficulty: Senior | Asked at: Confluent, LinkedIn, Airbnb, Stripe
βΉοΈInterview Context
Kafka Connect is the standard framework for integrating Kafka with external systems. Interviewers expect you to understand the architecture, exactly-once semantics, and how to customize behavior with SMTs.
The Question
Explain how Kafka Connect achieves exactly-once semantics. How do Single Message Transforms (SMTs) work? What's the difference between source and sink connector exactly-once delivery?
Kafka Connect Architecture
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Kafka Connect Cluster β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ β
β β Worker 1 β β Worker 2 β β Worker 3 β β
β β β β β β β β
β β ββββββββββββ β ββββββββββββ β ββββββββββββ β
β β βSource ββ β βSink ββ β βSource ββ β
β β βTask ββ β βTask ββ β βTask ββ β
β β ββββββββββββ β ββββββββββββ β ββββββββββββ β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Distributed Mode β β
β β - Consumer Group for task distribution β β
β β - Config storage in Kafka topics β β
β β - Offset storage in Kafka topics β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Connect Worker Configuration
# worker.properties (distributed mode)
bootstrap.servers=kafka1:9092,kafka2:9092,kafka3:9092
group.id=connect-cluster
# Key settings for exactly-once
connector.client.config.override.policy=All
# Storage
config.storage.topic=connect-configs
offset.storage.topic=connect-offsets
status.storage.topic=connect-status
# Replication
config.storage.replication.factor=3
offset.storage.replication.factor=3
status.storage.replication.factor=3
# Exactly-once support (Kafka 2.5+)
exactly.once.support=enabled
# Performance
tasks.max=10
plugin.path=/usr/share/java,/usr/share/kafka/plugins
Source Connector Exactly-Once
How Source Exactly-Once Works
Source Connector Exactly-Once Flow:
1. Source Task polls external system
2. Source Task produces messages to Kafka
3. Source Task commits source offsets to Kafka
4. If worker crashes:
- New worker reads committed offsets from Kafka
- Resumes from last committed offset
- Messages may be replayed (at-least-once)
5. With exactly-once:
- Source Task uses transactions
- Messages and offsets committed atomically
- No duplicates even on failure
Source Task Implementation
# Source task with exactly-once
from kafka.connect import SourceTask
import time
class DatabaseSourceTask(SourceTask):
def __init__(self):
self.connector_config = None
self.db_connection = None
self.last_offset = None
def start(self, config):
"""
Initialize source task.
Config includes:
- connection.url
- topic
- mode (bulk, timestamp, incrementing)
- timestamp.column.name
- incrementing.column.name
"""
self.connector_config = config
self.db_connection = self._create_connection(config['connection.url'])
self.last_offset = self._load_offset()
def poll(self):
"""
Poll external system and return records.
Returns list of SourceRecord:
- topic
- key
- value
- sourcePartition (for deduplication)
- sourceOffset (for resumption)
"""
records = []
# Query new records
query = f"""
SELECT * FROM events
WHERE id > {self.last_offset or 0}
ORDER BY id
LIMIT 1000
"""
cursor = self.db_connection.execute(query)
rows = cursor.fetchall()
for row in rows:
record = SourceRecord(
topic=self.connector_config['topic'],
key={'id': row['id']},
value=row,
sourcePartition={'table': 'events'},
sourceOffset={'id': row['id']}
)
records.append(record)
if rows:
self.last_offset = rows[-1]['id']
return records
def stop(self):
"""Clean up resources"""
if self.db_connection:
self.db_connection.close()
class SourceRecord:
def __init__(self, topic, key, value, sourcePartition, sourceOffset):
self.topic = topic
self.key = key
self.value = value
self.sourcePartition = sourcePartition
self.sourceOffset = sourceOffset
Source Offset Storage
# Source offsets stored in connect-offsets topic
# Key: sourcePartition
# Value: sourceOffset
# Example:
# Key: {"connector": "my-source", "task": 0, "table": "events"}
# Value: {"id": 12345, "timestamp": 1690000000000}
# On restart, task reads its last committed offset
# and resumes from that point
class OffsetManager:
def __init__(self, connect_worker):
self.worker = connect_worker
def commit_offset(self, source_partition, source_offset):
"""
Commit source offset to Kafka.
In exactly-once mode, this is done within a transaction
along with the produced messages.
"""
self.worker.commit_source_offset(
source_partition=source_partition,
source_offset=source_offset
)
def load_offset(self, source_partition):
"""
Load last committed offset.
Returns None if no offset exists (first run).
"""
return self.worker.read_source_offset(source_partition)
βΉοΈSource Exactly-Once
Source connector exactly-once requires the external system to support idempotent reads. If the source system provides unique IDs or timestamps, the connector can deduplicate messages on restart. Without this, you get at-least-once delivery.
Sink Connector Exactly-Once
How Sink Exactly-Once Works
Sink Connector Exactly-Once Flow:
1. Consumer polls messages from Kafka
2. Sink Task writes to external system
3. Consumer commits offsets to Kafka
4. If worker crashes:
- New worker reads committed offsets
- Resumes from last committed offset
- May re-process some messages (at-least-once)
5. With exactly-once:
- Sink Task writes to external system in transaction
- Consumer offsets committed atomically
- If write fails, offsets not committed
- Messages reprocessed on restart (at-least-once to external)
- But Kafka offsets ensure no Kafka-level duplicates
Sink Task Implementation
from kafka.connect import SinkTask
import psycopg2
class DatabaseSinkTask(SinkTask):
def __init__(self):
self.config = None
self.db_connection = None
self.batch_size = 1000
def start(self, config):
self.config = config
self.db_connection = self._create_connection(config['connection.url'])
def put(self, records):
"""
Process sink records.
In exactly-once mode:
- Records are delivered in a transaction
- If write fails, task throws exception
- Worker retries the batch
"""
# Batch insert for performance
for i in range(0, len(records), self.batch_size):
batch = records[i:i + self.batch_size]
self._insert_batch(batch)
def _insert_batch(self, records):
"""
Insert batch of records.
Uses INSERT ON CONFLICT for idempotency.
"""
query = """
INSERT INTO events (id, data, timestamp)
VALUES (%s, %s, %s)
ON CONFLICT (id) DO UPDATE
SET data = EXCLUDED.data,
timestamp = EXCLUDED.timestamp
"""
values = [
(r.key['id'], r.value, r.timestamp)
for r in records
]
cursor = self.db_connection.cursor()
cursor.executemany(query, values)
self.db_connection.commit()
def stop(self):
if self.db_connection:
self.db_connection.close()
class SinkRecord:
def __init__(self, topic, partition, offset, key, value, timestamp):
self.topic = topic
self.partition = partition
self.offset = offset
self.key = key
self.value = value
self.timestamp = timestamp
Sink Offset Commit Strategy
class ExactlyOnceSinkTask(SinkTask):
"""
Sink task with exactly-once semantics.
Uses Kafka transactions to atomically:
1. Write to external system
2. Commit consumer offsets
"""
def put(self, records):
"""
Process records with transactional guarantees.
Connect framework handles:
- Starting transaction
- Committing offsets within transaction
- Aborting on failure
"""
# Write to external system
self._write_to_database(records)
# Framework automatically commits offsets
# as part of the transaction
def _write_to_database(self, records):
"""
Write records to database.
Must be idempotent (use UPSERT/INSERT ON CONFLICT)
to handle retries safely.
"""
for record in records:
self._upsert_record(record)
def _upsert_record(self, record):
"""Insert or update single record"""
query = """
INSERT INTO events (id, payload, created_at)
VALUES (%s, %s, %s)
ON CONFLICT (id) DO UPDATE
SET payload = EXCLUDED.payload,
updated_at = NOW()
"""
self.db_connection.execute(query, (
record.key['id'],
record.value,
record.timestamp
))
β οΈSink Exactly-Once Limitation
Sink connector exactly-once guarantees exactly-once delivery to Kafka (offsets), but NOT exactly-once to the external system. The external system must support idempotent writes to avoid duplicates on retry.
Single Message Transforms (SMTs)
How SMTs Work
Message Flow with SMTs:
Source β SMT 1 β SMT 2 β SMT 3 β Kafka Topic
Each SMT transforms one message at a time:
- Can modify key, value, headers
- Can add/remove fields
- Can change topic
- Cannot access other messages (stateless)
Built-in SMTs
# Common built-in SMTs
smts = {
# Field manipulation
'InsertField': {
'description': 'Add new field to record',
'config': {
'transforms': 'insertField',
'transforms.insertField.type': 'org.apache.kafka.connect.transforms.InsertField$Value',
'transforms.insertField.timestamp.field': 'event_time',
'transforms.insertField.offset.field': 'kafka_offset'
}
},
# Field removal
'ReplaceField': {
'description': 'Remove or rename fields',
'config': {
'transforms': 'replaceField',
'transforms.replaceField.type': 'org.apache.kafka.connect.transforms.ReplaceField$Value',
'transforms.replaceField.exclude': 'password,secret',
'transforms.replaceField.renamed': 'email:email_address'
}
},
# Type conversion
'Cast': {
'description': 'Convert field types',
'config': {
'transforms': 'cast',
'transforms.cast.type': 'org.apache.kafka.connect.transforms.Cast$Value',
'transforms.cast.spec': 'amount:float64,quantity:int32'
}
},
# Topic routing
'RegexRouter': {
'description': 'Route to different topics',
'config': {
'transforms': 'route',
'transforms.route.type': 'org.apache.kafka.connect.transforms.RegexRouter',
'transforms.route.regex': '(.*)',
'transforms.route.replacement': 'backup-$1'
}
},
# Timestamp extraction
'TimestampRouter': {
'description': 'Add timestamp to topic',
'config': {
'transforms': 'route',
'transforms.route.type': 'org.apache.kafka.connect.transforms.TimestampRouter',
'transforms.route.timestamp.format': 'YYYY-MM-dd',
'transforms.route.topic.format': '${topic}-${timestamp}'
}
}
}
Custom SMT Implementation
from org.apache.kafka.connect.transforms import Transformation
from org.apache.kafka.connect.data import Schema, Struct
from org.apache.kafka.common.config import ConfigDef
class EnrichWithMetadata(Transformation):
"""
Custom SMT that enriches records with metadata.
Adds:
- Processing timestamp
- Source connector name
- Custom computed field
"""
CONFIG_DEF = ConfigDef()
CONFIG_DEF.define(
'metadata.source',
ConfigDef.Type.STRING,
'unknown',
ConfigDef.Importance.HIGH,
'Source identifier'
)
def config(self):
return self.CONFIG_DEF
def configure(self, config):
self.metadata_source = config.get('metadata.source', 'unknown')
def apply(self, record):
"""
Transform single record.
Can modify:
- record.key()
- record.value()
- record.headers()
- record.topic()
"""
if record.value() is None:
return record
# Clone the struct
value = record.value()
new_value = Struct(value.schema())
# Copy existing fields
for field in value.schema().fields():
new_value.put(field.name(), value.get(field))
# Add metadata fields
new_value.put('processed_at', System.currentTimeMillis())
new_value.put('source_connector', self.metadata_source)
new_value.put('kafka_topic', record.topic())
new_value.put('kafka_partition', record.kafkaPartition())
new_value.put('kafka_offset', record.kafkaOffset())
# Create new schema with additional fields
new_schema = self._add_metadata_fields(value.schema())
return record.newRecord(
record.topic(),
record.kafkaPartition(),
record.keySchema(),
record.key(),
new_schema,
new_value,
record.timestamp(),
record.headers()
)
def _add_metadata_fields(self, original_schema):
"""Add metadata fields to schema"""
builder = SchemaBuilder(original_schema.type())
for field in original_schema.fields():
builder.field(field.name(), field.schema())
builder.field('processed_at', Schema.INT64_SCHEMA)
builder.field('source_connector', Schema.STRING_SCHEMA)
builder.field('kafka_topic', Schema.STRING_SCHEMA)
builder.field('kafka_partition', Schema.INT32_SCHEMA)
builder.field('kafka_offset', Schema.INT64_SCHEMA)
return builder.build()
def close(self):
pass
βΉοΈSMT Performance
SMTs add processing overhead. Each SMT is applied to every record. For high-throughput connectors, minimize SMTs or use single complex SMT instead of multiple simple ones.
SMT Configuration Example
{
"name": "postgres-source",
"config": {
"connector.class": "io.confluent.connect.postgres.PostgresSourceConnector",
"tasks.max": "3",
"connection.url": "jdbc:postgresql://localhost:5432/mydb",
"connection.user": "kafka",
"topic": "db-events",
"transforms": "insertTimestamp,maskPii,routeByType",
"transforms.insertTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.insertTimestamp.timestamp.field": "event_timestamp",
"transforms.maskPii.type": "com.example.MaskPiiTransform",
"transforms.maskPii.fields": "email,phone",
"transforms.maskPii.mask.char": "*",
"transforms.routeByType.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.routeByType.regex": ".*\"type\":\"order\".*",
"transforms.routeByType.replacement": "orders-$1",
"transforms.routeByType.regex": ".*\"type\":\"user\".*",
"transforms.routeByType.replacement": "users-$1"
}
}
SMT Execution Order
Record enters connector pipeline:
1. Source Connector produces record
2. SMT 1 transforms record
3. SMT 2 transforms result of SMT 1
4. SMT 3 transforms result of SMT 2
5. Record sent to Kafka topic
Order matters!
Example:
Original: {"email": "user@example.com", "amount": 100}
SMT 1 (InsertField): adds "processed_at": 1690000000000
SMT 2 (MaskPii): masks email to "u***@e***.com"
SMT 3 (Cast): converts amount to float
Final: {"email": "u***@e***.com", "amount": 100.0, "processed_at": 1690000000000}
β οΈSMT Ordering
SMTs execute in the order specified in the transforms config. Changing the order can produce different results. Always test SMT chains thoroughly.
Common SMT Patterns
# Pattern 1: Data enrichment
enrichment_smts = {
'transforms': 'addMetadata,validateSchema,filterNulls',
# Add processing metadata
'transforms.addMetadata.type': 'com.example.AddMetadataTransform',
'transforms.addMetadata.source': 'postgres-cdc',
# Validate against schema
'transforms.validateSchema.type': 'com.example.ValidateSchemaTransform',
'transforms.validateSchema.required.fields': 'id,timestamp,event_type',
# Remove null fields
'transforms.filterNulls.type': 'org.apache.kafka.connect.transforms.ReplaceField$Value',
'transforms.filterNulls.exclude': 'null_fields'
}
# Pattern 2: Data masking for GDPR
gdpr_smts = {
'transforms': 'maskEmail,maskPhone,maskSSN',
'transforms.maskEmail.type': 'com.example.MaskTransform',
'transforms.maskEmail.field': 'email',
'transforms.maskEmail.pattern': '(.{2}).*(@.*)',
'transforms.maskEmail.replacement': '$1***$2',
'transforms.maskPhone.type': 'com.example.MaskTransform',
'transforms.maskPhone.field': 'phone',
'transforms.maskPhone.pattern': '(\\d{3})\\d{4}(\\d{4})',
'transforms.maskPhone.replacement': '$1****$2',
'transforms.maskSSN.type': 'com.example.MaskTransform',
'transforms.maskSSN.field': 'ssn',
'transforms.maskSSN.pattern': '(\\d{3})\\d{2}(\\d{4})',
'transforms.maskSSN.replacement': '$1**$2'
}
# Pattern 3: Schema evolution
schema_smts = {
'transforms': 'addVersion,addTimestamp',
'transforms.addVersion.type': 'com.example.AddSchemaVersion',
'transforms.addVersion.version.field': 'schema_version',
'transforms.addVersion.version.value': '2.0',
'transforms.addTimestamp.type': 'org.apache.kafka.connect.transforms.InsertField$Value',
'transforms.addTimestamp.timestamp.field': 'event_time'
}
Connector Performance Tuning
# Source connector performance
source_config = {
'batch.max.rows': 1000, # Records per poll
'poll.interval.ms': 1000, # Poll frequency
'snapshot.mode': 'initial', # Snapshot behavior
# Task configuration
'tasks.max': 3, # Parallel tasks
# Producer tuning (source)
'producer.batch.size': 32768,
'producer.linger.ms': 10,
'producer.compression.type': 'lz4',
'producer.acks': 'all',
'producer.enable.idempotence': True
}
# Sink connector performance
sink_config = {
'batch.size': 1000, # Records per flush
'flush.timeout.ms': 10000, # Max time before flush
'max.retries': 3, # Retry count
'retry.backoff.ms': 1000, # Retry delay
# Consumer tuning (sink)
'consumer.fetch.min.bytes': 1024,
'consumer.fetch.max.wait.ms': 500,
'consumer.max.partition.fetch.bytes': 1048576,
# Tasks
'tasks.max': 3
}
βΉοΈProduction Tip
Monitor connect-metrics for task performance. Key metrics: record-count, record-error-rate, batch-size-avg. Set alerts on error rate > 0.1% and batch size < 50% of configured max.
Exactly-Once Formula
def calculate_connect_exactly_once_cost(
records_per_sec: int,
avg_record_size: int,
transaction_overhead_ms: float,
external_write_ms: float
) -> dict:
"""
Calculate performance cost of exactly-once in Connect.
"""
# Transaction overhead (Kafka side)
transaction_cost_per_sec = transaction_overhead_ms / 1000
# External write cost (database side)
write_cost_per_sec = external_write_ms / 1000
# Total overhead
total_overhead = transaction_cost_per_sec + write_cost_per_sec
# Throughput reduction
throughput_reduction = (total_overhead * records_per_sec) / records_per_sec
return {
'transaction_overhead_per_sec': transaction_cost_per_sec,
'write_overhead_per_sec': write_cost_per_sec,
'total_overhead_per_sec': total_overhead,
'throughput_reduction_percent': throughput_reduction * 100,
'recommended_batch_size': max(100, int(records_per_sec * transaction_cost_per_sec))
}
# Example
cost = calculate_connect_exactly_once_cost(
records_per_sec=10000,
avg_record_size=500,
transaction_overhead_ms=10,
external_write_ms=5
)
print(f"Throughput reduction: {cost['throughput_reduction_percent']:.1f}%")
print(f"Recommended batch size: {cost['recommended_batch_size']}")
β οΈKey Insight
Kafka Connect exactly-once is different from Kafka Streams exactly-once. Connect guarantees exactly-once from source to Kafka, but the external system (sink) must support idempotent writes for true end-to-end exactly-once.