πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Kafka Connect: Exactly-Once, Single Message Transform

Apache KafkaKafka Connect⭐ Premium

Advertisement

Kafka Connect: Exactly-Once, Single Message Transform

Difficulty: Senior | Asked at: Confluent, LinkedIn, Airbnb, Stripe

ℹ️Interview Context

Kafka Connect is the standard framework for integrating Kafka with external systems. Interviewers expect you to understand the architecture, exactly-once semantics, and how to customize behavior with SMTs.

The Question

Explain how Kafka Connect achieves exactly-once semantics. How do Single Message Transforms (SMTs) work? What's the difference between source and sink connector exactly-once delivery?

Kafka Connect Architecture

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                  Kafka Connect Cluster                    β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                         β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚  Worker 1   β”‚  β”‚  Worker 2   β”‚  β”‚  Worker 3   β”‚   β”‚
β”‚  β”‚             β”‚  β”‚             β”‚  β”‚             β”‚   β”‚
β”‚  β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”β”‚  β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”β”‚  β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”β”‚   β”‚
β”‚  β”‚ β”‚Source   β”‚β”‚  β”‚ β”‚Sink     β”‚β”‚  β”‚ β”‚Source   β”‚β”‚   β”‚
β”‚  β”‚ β”‚Task     β”‚β”‚  β”‚ β”‚Task     β”‚β”‚  β”‚ β”‚Task     β”‚β”‚   β”‚
β”‚  β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜β”‚  β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜β”‚  β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                         β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚            Distributed Mode                     β”‚   β”‚
β”‚  β”‚  - Consumer Group for task distribution         β”‚   β”‚
β”‚  β”‚  - Config storage in Kafka topics               β”‚   β”‚
β”‚  β”‚  - Offset storage in Kafka topics               β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Connect Worker Configuration

# worker.properties (distributed mode)
bootstrap.servers=kafka1:9092,kafka2:9092,kafka3:9092
group.id=connect-cluster

# Key settings for exactly-once
connector.client.config.override.policy=All

# Storage
config.storage.topic=connect-configs
offset.storage.topic=connect-offsets
status.storage.topic=connect-status

# Replication
config.storage.replication.factor=3
offset.storage.replication.factor=3
status.storage.replication.factor=3

# Exactly-once support (Kafka 2.5+)
exactly.once.support=enabled

# Performance
tasks.max=10
plugin.path=/usr/share/java,/usr/share/kafka/plugins

Source Connector Exactly-Once

How Source Exactly-Once Works

Architecture Diagram
Source Connector Exactly-Once Flow:

1. Source Task polls external system
2. Source Task produces messages to Kafka
3. Source Task commits source offsets to Kafka
4. If worker crashes:
   - New worker reads committed offsets from Kafka
   - Resumes from last committed offset
   - Messages may be replayed (at-least-once)
   
5. With exactly-once:
   - Source Task uses transactions
   - Messages and offsets committed atomically
   - No duplicates even on failure

Source Task Implementation

# Source task with exactly-once
from kafka.connect import SourceTask
import time

class DatabaseSourceTask(SourceTask):
    def __init__(self):
        self.connector_config = None
        self.db_connection = None
        self.last_offset = None
    
    def start(self, config):
        """
        Initialize source task.
        
        Config includes:
        - connection.url
        - topic
        - mode (bulk, timestamp, incrementing)
        - timestamp.column.name
        - incrementing.column.name
        """
        self.connector_config = config
        self.db_connection = self._create_connection(config['connection.url'])
        self.last_offset = self._load_offset()
    
    def poll(self):
        """
        Poll external system and return records.
        
        Returns list of SourceRecord:
        - topic
        - key
        - value
        - sourcePartition (for deduplication)
        - sourceOffset (for resumption)
        """
        records = []
        
        # Query new records
        query = f"""
            SELECT * FROM events
            WHERE id > {self.last_offset or 0}
            ORDER BY id
            LIMIT 1000
        """
        
        cursor = self.db_connection.execute(query)
        rows = cursor.fetchall()
        
        for row in rows:
            record = SourceRecord(
                topic=self.connector_config['topic'],
                key={'id': row['id']},
                value=row,
                sourcePartition={'table': 'events'},
                sourceOffset={'id': row['id']}
            )
            records.append(record)
        
        if rows:
            self.last_offset = rows[-1]['id']
        
        return records
    
    def stop(self):
        """Clean up resources"""
        if self.db_connection:
            self.db_connection.close()

class SourceRecord:
    def __init__(self, topic, key, value, sourcePartition, sourceOffset):
        self.topic = topic
        self.key = key
        self.value = value
        self.sourcePartition = sourcePartition
        self.sourceOffset = sourceOffset

Source Offset Storage

# Source offsets stored in connect-offsets topic
# Key: sourcePartition
# Value: sourceOffset

# Example:
# Key: {"connector": "my-source", "task": 0, "table": "events"}
# Value: {"id": 12345, "timestamp": 1690000000000}

# On restart, task reads its last committed offset
# and resumes from that point

class OffsetManager:
    def __init__(self, connect_worker):
        self.worker = connect_worker
    
    def commit_offset(self, source_partition, source_offset):
        """
        Commit source offset to Kafka.
        
        In exactly-once mode, this is done within a transaction
        along with the produced messages.
        """
        self.worker.commit_source_offset(
            source_partition=source_partition,
            source_offset=source_offset
        )
    
    def load_offset(self, source_partition):
        """
        Load last committed offset.
        
        Returns None if no offset exists (first run).
        """
        return self.worker.read_source_offset(source_partition)

ℹ️Source Exactly-Once

Source connector exactly-once requires the external system to support idempotent reads. If the source system provides unique IDs or timestamps, the connector can deduplicate messages on restart. Without this, you get at-least-once delivery.

Sink Connector Exactly-Once

How Sink Exactly-Once Works

Architecture Diagram
Sink Connector Exactly-Once Flow:

1. Consumer polls messages from Kafka
2. Sink Task writes to external system
3. Consumer commits offsets to Kafka
4. If worker crashes:
   - New worker reads committed offsets
   - Resumes from last committed offset
   - May re-process some messages (at-least-once)
   
5. With exactly-once:
   - Sink Task writes to external system in transaction
   - Consumer offsets committed atomically
   - If write fails, offsets not committed
   - Messages reprocessed on restart (at-least-once to external)
   - But Kafka offsets ensure no Kafka-level duplicates

Sink Task Implementation

from kafka.connect import SinkTask
import psycopg2

class DatabaseSinkTask(SinkTask):
    def __init__(self):
        self.config = None
        self.db_connection = None
        self.batch_size = 1000
    
    def start(self, config):
        self.config = config
        self.db_connection = self._create_connection(config['connection.url'])
    
    def put(self, records):
        """
        Process sink records.
        
        In exactly-once mode:
        - Records are delivered in a transaction
        - If write fails, task throws exception
        - Worker retries the batch
        """
        # Batch insert for performance
        for i in range(0, len(records), self.batch_size):
            batch = records[i:i + self.batch_size]
            self._insert_batch(batch)
    
    def _insert_batch(self, records):
        """
        Insert batch of records.
        
        Uses INSERT ON CONFLICT for idempotency.
        """
        query = """
            INSERT INTO events (id, data, timestamp)
            VALUES (%s, %s, %s)
            ON CONFLICT (id) DO UPDATE
            SET data = EXCLUDED.data,
                timestamp = EXCLUDED.timestamp
        """
        
        values = [
            (r.key['id'], r.value, r.timestamp)
            for r in records
        ]
        
        cursor = self.db_connection.cursor()
        cursor.executemany(query, values)
        self.db_connection.commit()
    
    def stop(self):
        if self.db_connection:
            self.db_connection.close()

class SinkRecord:
    def __init__(self, topic, partition, offset, key, value, timestamp):
        self.topic = topic
        self.partition = partition
        self.offset = offset
        self.key = key
        self.value = value
        self.timestamp = timestamp

Sink Offset Commit Strategy

class ExactlyOnceSinkTask(SinkTask):
    """
    Sink task with exactly-once semantics.
    
    Uses Kafka transactions to atomically:
    1. Write to external system
    2. Commit consumer offsets
    """
    
    def put(self, records):
        """
        Process records with transactional guarantees.
        
        Connect framework handles:
        - Starting transaction
        - Committing offsets within transaction
        - Aborting on failure
        """
        # Write to external system
        self._write_to_database(records)
        
        # Framework automatically commits offsets
        # as part of the transaction
        
    def _write_to_database(self, records):
        """
        Write records to database.
        
        Must be idempotent (use UPSERT/INSERT ON CONFLICT)
        to handle retries safely.
        """
        for record in records:
            self._upsert_record(record)
    
    def _upsert_record(self, record):
        """Insert or update single record"""
        query = """
            INSERT INTO events (id, payload, created_at)
            VALUES (%s, %s, %s)
            ON CONFLICT (id) DO UPDATE
            SET payload = EXCLUDED.payload,
                updated_at = NOW()
        """
        self.db_connection.execute(query, (
            record.key['id'],
            record.value,
            record.timestamp
        ))

⚠️Sink Exactly-Once Limitation

Sink connector exactly-once guarantees exactly-once delivery to Kafka (offsets), but NOT exactly-once to the external system. The external system must support idempotent writes to avoid duplicates on retry.

Single Message Transforms (SMTs)

How SMTs Work

Architecture Diagram
Message Flow with SMTs:

Source β†’ SMT 1 β†’ SMT 2 β†’ SMT 3 β†’ Kafka Topic

Each SMT transforms one message at a time:
- Can modify key, value, headers
- Can add/remove fields
- Can change topic
- Cannot access other messages (stateless)

Built-in SMTs

# Common built-in SMTs

smts = {
    # Field manipulation
    'InsertField': {
        'description': 'Add new field to record',
        'config': {
            'transforms': 'insertField',
            'transforms.insertField.type': 'org.apache.kafka.connect.transforms.InsertField$Value',
            'transforms.insertField.timestamp.field': 'event_time',
            'transforms.insertField.offset.field': 'kafka_offset'
        }
    },
    
    # Field removal
    'ReplaceField': {
        'description': 'Remove or rename fields',
        'config': {
            'transforms': 'replaceField',
            'transforms.replaceField.type': 'org.apache.kafka.connect.transforms.ReplaceField$Value',
            'transforms.replaceField.exclude': 'password,secret',
            'transforms.replaceField.renamed': 'email:email_address'
        }
    },
    
    # Type conversion
    'Cast': {
        'description': 'Convert field types',
        'config': {
            'transforms': 'cast',
            'transforms.cast.type': 'org.apache.kafka.connect.transforms.Cast$Value',
            'transforms.cast.spec': 'amount:float64,quantity:int32'
        }
    },
    
    # Topic routing
    'RegexRouter': {
        'description': 'Route to different topics',
        'config': {
            'transforms': 'route',
            'transforms.route.type': 'org.apache.kafka.connect.transforms.RegexRouter',
            'transforms.route.regex': '(.*)',
            'transforms.route.replacement': 'backup-$1'
        }
    },
    
    # Timestamp extraction
    'TimestampRouter': {
        'description': 'Add timestamp to topic',
        'config': {
            'transforms': 'route',
            'transforms.route.type': 'org.apache.kafka.connect.transforms.TimestampRouter',
            'transforms.route.timestamp.format': 'YYYY-MM-dd',
            'transforms.route.topic.format': '${topic}-${timestamp}'
        }
    }
}

Custom SMT Implementation

from org.apache.kafka.connect.transforms import Transformation
from org.apache.kafka.connect.data import Schema, Struct
from org.apache.kafka.common.config import ConfigDef

class EnrichWithMetadata(Transformation):
    """
    Custom SMT that enriches records with metadata.
    
    Adds:
    - Processing timestamp
    - Source connector name
    - Custom computed field
    """
    
    CONFIG_DEF = ConfigDef()
    CONFIG_DEF.define(
        'metadata.source',
        ConfigDef.Type.STRING,
        'unknown',
        ConfigDef.Importance.HIGH,
        'Source identifier'
    )
    
    def config(self):
        return self.CONFIG_DEF
    
    def configure(self, config):
        self.metadata_source = config.get('metadata.source', 'unknown')
    
    def apply(self, record):
        """
        Transform single record.
        
        Can modify:
        - record.key()
        - record.value()
        - record.headers()
        - record.topic()
        """
        if record.value() is None:
            return record
        
        # Clone the struct
        value = record.value()
        new_value = Struct(value.schema())
        
        # Copy existing fields
        for field in value.schema().fields():
            new_value.put(field.name(), value.get(field))
        
        # Add metadata fields
        new_value.put('processed_at', System.currentTimeMillis())
        new_value.put('source_connector', self.metadata_source)
        new_value.put('kafka_topic', record.topic())
        new_value.put('kafka_partition', record.kafkaPartition())
        new_value.put('kafka_offset', record.kafkaOffset())
        
        # Create new schema with additional fields
        new_schema = self._add_metadata_fields(value.schema())
        
        return record.newRecord(
            record.topic(),
            record.kafkaPartition(),
            record.keySchema(),
            record.key(),
            new_schema,
            new_value,
            record.timestamp(),
            record.headers()
        )
    
    def _add_metadata_fields(self, original_schema):
        """Add metadata fields to schema"""
        builder = SchemaBuilder(original_schema.type())
        
        for field in original_schema.fields():
            builder.field(field.name(), field.schema())
        
        builder.field('processed_at', Schema.INT64_SCHEMA)
        builder.field('source_connector', Schema.STRING_SCHEMA)
        builder.field('kafka_topic', Schema.STRING_SCHEMA)
        builder.field('kafka_partition', Schema.INT32_SCHEMA)
        builder.field('kafka_offset', Schema.INT64_SCHEMA)
        
        return builder.build()
    
    def close(self):
        pass

ℹ️SMT Performance

SMTs add processing overhead. Each SMT is applied to every record. For high-throughput connectors, minimize SMTs or use single complex SMT instead of multiple simple ones.

SMT Configuration Example

{
  "name": "postgres-source",
  "config": {
    "connector.class": "io.confluent.connect.postgres.PostgresSourceConnector",
    "tasks.max": "3",
    "connection.url": "jdbc:postgresql://localhost:5432/mydb",
    "connection.user": "kafka",
    "topic": "db-events",
    
    "transforms": "insertTimestamp,maskPii,routeByType",
    
    "transforms.insertTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
    "transforms.insertTimestamp.timestamp.field": "event_timestamp",
    
    "transforms.maskPii.type": "com.example.MaskPiiTransform",
    "transforms.maskPii.fields": "email,phone",
    "transforms.maskPii.mask.char": "*",
    
    "transforms.routeByType.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.routeByType.regex": ".*\"type\":\"order\".*",
    "transforms.routeByType.replacement": "orders-$1",
    
    "transforms.routeByType.regex": ".*\"type\":\"user\".*",
    "transforms.routeByType.replacement": "users-$1"
  }
}

SMT Execution Order

Architecture Diagram
Record enters connector pipeline:

1. Source Connector produces record
2. SMT 1 transforms record
3. SMT 2 transforms result of SMT 1
4. SMT 3 transforms result of SMT 2
5. Record sent to Kafka topic

Order matters!
Example:
  Original: {"email": "user@example.com", "amount": 100}
  
  SMT 1 (InsertField): adds "processed_at": 1690000000000
  SMT 2 (MaskPii): masks email to "u***@e***.com"
  SMT 3 (Cast): converts amount to float
  
  Final: {"email": "u***@e***.com", "amount": 100.0, "processed_at": 1690000000000}

⚠️SMT Ordering

SMTs execute in the order specified in the transforms config. Changing the order can produce different results. Always test SMT chains thoroughly.

Common SMT Patterns

# Pattern 1: Data enrichment
enrichment_smts = {
    'transforms': 'addMetadata,validateSchema,filterNulls',
    
    # Add processing metadata
    'transforms.addMetadata.type': 'com.example.AddMetadataTransform',
    'transforms.addMetadata.source': 'postgres-cdc',
    
    # Validate against schema
    'transforms.validateSchema.type': 'com.example.ValidateSchemaTransform',
    'transforms.validateSchema.required.fields': 'id,timestamp,event_type',
    
    # Remove null fields
    'transforms.filterNulls.type': 'org.apache.kafka.connect.transforms.ReplaceField$Value',
    'transforms.filterNulls.exclude': 'null_fields'
}

# Pattern 2: Data masking for GDPR
gdpr_smts = {
    'transforms': 'maskEmail,maskPhone,maskSSN',
    
    'transforms.maskEmail.type': 'com.example.MaskTransform',
    'transforms.maskEmail.field': 'email',
    'transforms.maskEmail.pattern': '(.{2}).*(@.*)',
    'transforms.maskEmail.replacement': '$1***$2',
    
    'transforms.maskPhone.type': 'com.example.MaskTransform',
    'transforms.maskPhone.field': 'phone',
    'transforms.maskPhone.pattern': '(\\d{3})\\d{4}(\\d{4})',
    'transforms.maskPhone.replacement': '$1****$2',
    
    'transforms.maskSSN.type': 'com.example.MaskTransform',
    'transforms.maskSSN.field': 'ssn',
    'transforms.maskSSN.pattern': '(\\d{3})\\d{2}(\\d{4})',
    'transforms.maskSSN.replacement': '$1**$2'
}

# Pattern 3: Schema evolution
schema_smts = {
    'transforms': 'addVersion,addTimestamp',
    
    'transforms.addVersion.type': 'com.example.AddSchemaVersion',
    'transforms.addVersion.version.field': 'schema_version',
    'transforms.addVersion.version.value': '2.0',
    
    'transforms.addTimestamp.type': 'org.apache.kafka.connect.transforms.InsertField$Value',
    'transforms.addTimestamp.timestamp.field': 'event_time'
}

Connector Performance Tuning

# Source connector performance
source_config = {
    'batch.max.rows': 1000,           # Records per poll
    'poll.interval.ms': 1000,         # Poll frequency
    'snapshot.mode': 'initial',       # Snapshot behavior
    
    # Task configuration
    'tasks.max': 3,                   # Parallel tasks
    
    # Producer tuning (source)
    'producer.batch.size': 32768,
    'producer.linger.ms': 10,
    'producer.compression.type': 'lz4',
    'producer.acks': 'all',
    'producer.enable.idempotence': True
}

# Sink connector performance
sink_config = {
    'batch.size': 1000,               # Records per flush
    'flush.timeout.ms': 10000,        # Max time before flush
    'max.retries': 3,                 # Retry count
    'retry.backoff.ms': 1000,         # Retry delay
    
    # Consumer tuning (sink)
    'consumer.fetch.min.bytes': 1024,
    'consumer.fetch.max.wait.ms': 500,
    'consumer.max.partition.fetch.bytes': 1048576,
    
    # Tasks
    'tasks.max': 3
}

ℹ️Production Tip

Monitor connect-metrics for task performance. Key metrics: record-count, record-error-rate, batch-size-avg. Set alerts on error rate > 0.1% and batch size < 50% of configured max.

Exactly-Once Formula

Exactly-OnceΒ Cost=TransactionΒ Overhead+IdempotentΒ WriteΒ Cost\text{Exactly-Once Cost} = \text{Transaction Overhead} + \text{Idempotent Write Cost}
def calculate_connect_exactly_once_cost(
    records_per_sec: int,
    avg_record_size: int,
    transaction_overhead_ms: float,
    external_write_ms: float
) -> dict:
    """
    Calculate performance cost of exactly-once in Connect.
    """
    # Transaction overhead (Kafka side)
    transaction_cost_per_sec = transaction_overhead_ms / 1000
    
    # External write cost (database side)
    write_cost_per_sec = external_write_ms / 1000
    
    # Total overhead
    total_overhead = transaction_cost_per_sec + write_cost_per_sec
    
    # Throughput reduction
    throughput_reduction = (total_overhead * records_per_sec) / records_per_sec
    
    return {
        'transaction_overhead_per_sec': transaction_cost_per_sec,
        'write_overhead_per_sec': write_cost_per_sec,
        'total_overhead_per_sec': total_overhead,
        'throughput_reduction_percent': throughput_reduction * 100,
        'recommended_batch_size': max(100, int(records_per_sec * transaction_cost_per_sec))
    }

# Example
cost = calculate_connect_exactly_once_cost(
    records_per_sec=10000,
    avg_record_size=500,
    transaction_overhead_ms=10,
    external_write_ms=5
)
print(f"Throughput reduction: {cost['throughput_reduction_percent']:.1f}%")
print(f"Recommended batch size: {cost['recommended_batch_size']}")

⚠️Key Insight

Kafka Connect exactly-once is different from Kafka Streams exactly-once. Connect guarantees exactly-once from source to Kafka, but the external system (sink) must support idempotent writes for true end-to-end exactly-once.

Advertisement