πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Kafka Connect Patterns

Apache KafkaKafka Connect⭐ Premium

Advertisement

Kafka Connect Patterns

Difficulty: Senior Level | Companies: LinkedIn, Uber, Netflix, Spotify, Confluent

Content

Kafka Connect is a framework for streaming data between Kafka and external systems. It provides scalability, fault tolerance, and exactly-once semantics.

Architecture Overview

Architecture Diagram
Kafka Connect Cluster:
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  Worker 1              Worker 2                  β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”          β”‚
β”‚  β”‚ Source Task β”‚      β”‚ Source Task β”‚          β”‚
β”‚  β”‚ (JDBC)      β”‚      β”‚ (File)      β”‚          β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜          β”‚
β”‚         β”‚                    β”‚                  β”‚
β”‚         β–Ό                    β–Ό                  β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”          β”‚
β”‚  β”‚ Sink Task   β”‚      β”‚ Sink Task   β”‚          β”‚
β”‚  β”‚ (Elastic)   β”‚      β”‚ (S3)        β”‚          β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜          β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Components:
- Workers: Run tasks in a cluster
- Tasks: Execute connector logic
- Connectors: Define data flow configuration
- Transformations: Modify records in-flight

Source Connector Example

{
  "name": "postgres-source",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "postgres.example.com",
    "database.port": "5432",
    "database.user": "kafka-connect",
    "database.password": "${secrets:postgres-password}",
    "database.dbname": "orders_db",
    "database.server.name": "orders",
    "schema.include.list": "public",
    "table.include.list": "public.orders,public.customers",
    
    "plugin.name": "pgoutput",
    "slot.name": "kafka_connect",
    "publication.name": "kafka_connect_pub",
    
    "topic.prefix": "cdc",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    
    "transforms": "route",
    "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
    "transforms.replacement": "$3"
  }
}

REST API Deployment

# Deploy source connector
curl -X POST http://connect-cluster:8083/connectors \
  -H "Content-Type: application/json" \
  -d '{
    "name": "postgres-source",
    "config": {
      "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
      "database.hostname": "postgres.example.com",
      "database.port": "5432",
      "database.user": "kafka-connect",
      "database.dbname": "orders_db",
      "topic.prefix": "cdc"
    }
  }'

# Check connector status
curl http://connect-cluster:8083/connectors/postgres-source/status

# Restart connector
curl -X POST http://connect-cluster:8083/connectors/postgres-source/restart

Sink Connector Example

{
  "name": "elasticsearch-sink",
  "config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "topics": "enriched-orders",
    "connection.url": "http://elasticsearch:9200",
    
    "type.name": "_doc",
    "key.ignore": "false",
    "schema.ignore": "false",
    
    "transformers": "org.apache.kafka.connect.transforms.ExtractField$Key",
    "transformers.field": "order_id",
    
    "write.method": "upsert",
    "behavior.on.null.values": "delete",
    
    "batch.size": 500,
    "linger.ms": 10,
    "flush.timeout.ms": 10000,
    
    "elastic.security.protocol": "SSL",
    "elastic.https.ssl.truststore.location": "/etc/kafka/secrets/truststore.jks",
    "elastic.https.ssl.truststore.password": "${secrets:truststore-password}"
  }
}

Single Message Transforms (SMTs)

Common Transformations

{
  "transforms": "route,extract,mask",
  "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
  "transforms.route.regex": "cdc\\.public\\.(.*)",
  "transforms.route.replacement": "$1",
  
  "transforms.extract.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
  "transforms.extract.field": "order_id",
  
  "transforms.mask.type": "org.apache.kafka.connect.transforms.MaskField$Value",
  "transforms.mask.fields": "email,phone",
  "transforms.mask.replacement": "****"
}

Custom SMT

public class TimestampConverter<S> extends Transformation<S> {
    
    private String timestampField;
    private String targetFormat;
    
    @Override
    public void configure(Map<String, ?> configs) {
        this.timestampField = (String) configs.get("timestamp.field");
        this.targetFormat = (String) configs.get("target.format");
    }
    
    @Override
    public S apply(S record) {
        if (record instanceof SinkRecord) {
            SinkRecord sinkRecord = (SinkRecord) record;
            Struct value = (Struct) sinkRecord.value();
            
            Long timestamp = value.getLong(timestampField);
            String formatted = formatTimestamp(timestamp, targetFormat);
            
            value.put(timestampField + "_formatted", formatted);
            return (S) new SinkRecord(
                sinkRecord.topic(),
                sinkRecord.kafkaPartition(),
                sinkRecord.keySchema(),
                sinkRecord.key(),
                sinkRecord.valueSchema(),
                value,
                sinkRecord.timestamp()
            );
        }
        return record;
    }
    
    private String formatTimestamp(Long timestamp, String format) {
        SimpleDateFormat sdf = new SimpleDateFormat(format);
        return sdf.format(new Date(timestamp));
    }
    
    @Override
    public ConfigDef config() {
        ConfigDef config = new ConfigDef();
        config.define("timestampField", Type.STRING, Importance.HIGH, "");
        config.define("targetFormat", Type.STRING, Importance.HIGH, "");
        return config;
    }
}

Dead Letter Queue Pattern

{
  "name": "source-with-dlq",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "postgres.example.com",
    "topic.prefix": "cdc",
    
    "errors.tolerance": "all",
    "errors.deadletterqueue.topic.name": "dead-letter-queue",
    "errors.deadletterqueue.topic.replication.factor": 3,
    "errors.deadletterqueue.context.headers.enable": true,
    
    "errors.log.enable": true,
    "errors.log.include.messages": true
  }
}

⚠️

Important: The errors.tolerance=all setting is required for dead letter queue support. Without it, connector will fail on first error.

Exactly-Once with Connect

// Source connector with exactly-once
@Override
public List<SourceRecord> poll() throws InterruptedException {
    // Read from source system
    List<SourceRecord> records = readFromSource();
    
    // Convert to SourceRecords with unique keys
    return records.stream()
        .map(this::toSourceRecord)
        .collect(Collectors.toList());
}

// Sink connector with exactly-once
@Override
public void put(Collection<SinkRecord> records) {
    // Process in single transaction
    connection.beginTransaction();
    
    for (SinkRecord record : records) {
        // Upsert to destination
        upsertRecord(record);
    }
    
    // Commit transaction
    connection.commit();
}

Connector Management

Monitoring

import requests
import time

class ConnectMonitor:
    def __init__(self, connect_url):
        self.connect_url = connect_url
    
    def get_connector_status(self, connector_name):
        response = requests.get(
            f'{self.connect_url}/connectors/{connector_name}/status'
        )
        return response.json()
    
    def get_all_connectors(self):
        response = requests.get(f'{self.connect_url}/connectors')
        return response.json()
    
    def monitor_connectors(self):
        while True:
            connectors = self.get_all_connectors()
            
            for connector in connectors:
                status = self.get_connector_status(connector)
                state = status['connector']['state']
                
                if state != 'RUNNING':
                    print(f"Connector {connector} is {state}")
                    self.alert(connector, state)
            
            time.sleep(60)
    
    def alert(self, connector, state):
        # Send alert (Slack, PagerDuty, etc.)
        pass

Follow-Up Questions

  1. What is the difference between a connector and a task in Kafka Connect?
  2. How does Kafka Connect achieve exactly-once delivery for source connectors?
  3. Explain the purpose of dead letter queues and when to use them.
  4. How would you handle schema evolution in a Kafka Connect pipeline?
  5. What are the trade-offs between single message transforms and SMT chains?

Advertisement