Kafka Connect Patterns
Difficulty: Senior Level | Companies: LinkedIn, Uber, Netflix, Spotify, Confluent
Content
Kafka Connect is a framework for streaming data between Kafka and external systems. It provides scalability, fault tolerance, and exactly-once semantics.
Architecture Overview
Architecture Diagram
Kafka Connect Cluster:
βββββββββββββββββββββββββββββββββββββββββββββββββββ
β Worker 1 Worker 2 β
β βββββββββββββββ βββββββββββββββ β
β β Source Task β β Source Task β β
β β (JDBC) β β (File) β β
β βββββββββββββββ βββββββββββββββ β
β β β β
β βΌ βΌ β
β βββββββββββββββ βββββββββββββββ β
β β Sink Task β β Sink Task β β
β β (Elastic) β β (S3) β β
β βββββββββββββββ βββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββ
Components:
- Workers: Run tasks in a cluster
- Tasks: Execute connector logic
- Connectors: Define data flow configuration
- Transformations: Modify records in-flight
Source Connector Example
{
"name": "postgres-source",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres.example.com",
"database.port": "5432",
"database.user": "kafka-connect",
"database.password": "${secrets:postgres-password}",
"database.dbname": "orders_db",
"database.server.name": "orders",
"schema.include.list": "public",
"table.include.list": "public.orders,public.customers",
"plugin.name": "pgoutput",
"slot.name": "kafka_connect",
"publication.name": "kafka_connect_pub",
"topic.prefix": "cdc",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"transforms": "route",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.replacement": "$3"
}
}
REST API Deployment
# Deploy source connector
curl -X POST http://connect-cluster:8083/connectors \
-H "Content-Type: application/json" \
-d '{
"name": "postgres-source",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres.example.com",
"database.port": "5432",
"database.user": "kafka-connect",
"database.dbname": "orders_db",
"topic.prefix": "cdc"
}
}'
# Check connector status
curl http://connect-cluster:8083/connectors/postgres-source/status
# Restart connector
curl -X POST http://connect-cluster:8083/connectors/postgres-source/restart
Sink Connector Example
{
"name": "elasticsearch-sink",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"topics": "enriched-orders",
"connection.url": "http://elasticsearch:9200",
"type.name": "_doc",
"key.ignore": "false",
"schema.ignore": "false",
"transformers": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transformers.field": "order_id",
"write.method": "upsert",
"behavior.on.null.values": "delete",
"batch.size": 500,
"linger.ms": 10,
"flush.timeout.ms": 10000,
"elastic.security.protocol": "SSL",
"elastic.https.ssl.truststore.location": "/etc/kafka/secrets/truststore.jks",
"elastic.https.ssl.truststore.password": "${secrets:truststore-password}"
}
}
Single Message Transforms (SMTs)
Common Transformations
{
"transforms": "route,extract,mask",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "cdc\\.public\\.(.*)",
"transforms.route.replacement": "$1",
"transforms.extract.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extract.field": "order_id",
"transforms.mask.type": "org.apache.kafka.connect.transforms.MaskField$Value",
"transforms.mask.fields": "email,phone",
"transforms.mask.replacement": "****"
}
Custom SMT
public class TimestampConverter<S> extends Transformation<S> {
private String timestampField;
private String targetFormat;
@Override
public void configure(Map<String, ?> configs) {
this.timestampField = (String) configs.get("timestamp.field");
this.targetFormat = (String) configs.get("target.format");
}
@Override
public S apply(S record) {
if (record instanceof SinkRecord) {
SinkRecord sinkRecord = (SinkRecord) record;
Struct value = (Struct) sinkRecord.value();
Long timestamp = value.getLong(timestampField);
String formatted = formatTimestamp(timestamp, targetFormat);
value.put(timestampField + "_formatted", formatted);
return (S) new SinkRecord(
sinkRecord.topic(),
sinkRecord.kafkaPartition(),
sinkRecord.keySchema(),
sinkRecord.key(),
sinkRecord.valueSchema(),
value,
sinkRecord.timestamp()
);
}
return record;
}
private String formatTimestamp(Long timestamp, String format) {
SimpleDateFormat sdf = new SimpleDateFormat(format);
return sdf.format(new Date(timestamp));
}
@Override
public ConfigDef config() {
ConfigDef config = new ConfigDef();
config.define("timestampField", Type.STRING, Importance.HIGH, "");
config.define("targetFormat", Type.STRING, Importance.HIGH, "");
return config;
}
}
Dead Letter Queue Pattern
{
"name": "source-with-dlq",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres.example.com",
"topic.prefix": "cdc",
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "dead-letter-queue",
"errors.deadletterqueue.topic.replication.factor": 3,
"errors.deadletterqueue.context.headers.enable": true,
"errors.log.enable": true,
"errors.log.include.messages": true
}
}
β οΈ
Important: The errors.tolerance=all setting is required for dead letter queue support. Without it, connector will fail on first error.
Exactly-Once with Connect
// Source connector with exactly-once
@Override
public List<SourceRecord> poll() throws InterruptedException {
// Read from source system
List<SourceRecord> records = readFromSource();
// Convert to SourceRecords with unique keys
return records.stream()
.map(this::toSourceRecord)
.collect(Collectors.toList());
}
// Sink connector with exactly-once
@Override
public void put(Collection<SinkRecord> records) {
// Process in single transaction
connection.beginTransaction();
for (SinkRecord record : records) {
// Upsert to destination
upsertRecord(record);
}
// Commit transaction
connection.commit();
}
Connector Management
Monitoring
import requests
import time
class ConnectMonitor:
def __init__(self, connect_url):
self.connect_url = connect_url
def get_connector_status(self, connector_name):
response = requests.get(
f'{self.connect_url}/connectors/{connector_name}/status'
)
return response.json()
def get_all_connectors(self):
response = requests.get(f'{self.connect_url}/connectors')
return response.json()
def monitor_connectors(self):
while True:
connectors = self.get_all_connectors()
for connector in connectors:
status = self.get_connector_status(connector)
state = status['connector']['state']
if state != 'RUNNING':
print(f"Connector {connector} is {state}")
self.alert(connector, state)
time.sleep(60)
def alert(self, connector, state):
# Send alert (Slack, PagerDuty, etc.)
pass
Follow-Up Questions
- What is the difference between a connector and a task in Kafka Connect?
- How does Kafka Connect achieve exactly-once delivery for source connectors?
- Explain the purpose of dead letter queues and when to use them.
- How would you handle schema evolution in a Kafka Connect pipeline?
- What are the trade-offs between single message transforms and SMT chains?