Kafka Security
Overview
Kafka provides comprehensive security features to protect your event streaming platform across four critical dimensions: authentication, authorization, encryption, and auditing. This guide covers implementing each security layer effectively.
Security Requirements
- Confidentiality: Data encrypted in transit and at rest
- Integrity: Messages cannot be tampered with
- Authentication: Verify identity of clients and brokers
- Authorization: Control access to topics and operations
- Auditability: Track all access and operations
SSL/TLS Encryption
Certificate Generation
# Generate CA certificate
openssl req -new -x509 -keyout ca-key -out ca-cert -days 365 \
-subj "/CN=Kafka-CA" -passout pass:ca-password
# Generate broker keystore
keytool -keystore kafka.server.keystore.jks \
-alias kafka-broker -validity 365 -genkey \
-keyalg RSA -keysize 2048 \
-dname "CN=kafka-broker,OU=IT,O=Company"
# Generate broker truststore
keytool -keystore kafka.server.truststore.jks \
-alias ca-cert -importcert -file ca-cert \
-storepass truststore-password
Broker SSL Configuration
# server.properties
listeners=SSL://0.0.0.0:9093
ssl.keystore.location=/var/kafka/ssl/kafka.server.keystore.jks
ssl.keystore.password=keystore-password
ssl.key.password=key-password
ssl.truststore.location=/var/kafka/ssl/kafka.server.truststore.jks
ssl.truststore.password=truststore-password
ssl.client.auth=required
ssl.enabled.protocols=TLSv1.2,TLSv1.3
ssl.cipher.suites=TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384
Client SSL Configuration
# client.properties
ssl.truststore.location=/var/kafka/ssl/kafka.client.truststore.jks
ssl.truststore.password=truststore-password
ssl.keystore.location=/var/kafka/ssl/kafka.client.keystore.jks
ssl.keystore.password=keystore-password
ssl.key.password=key-password
security.protocol=SSL
SASL Authentication
SASL Mechanisms Comparison
| Mechanism | Description | Use Case |
|---|---|---|
| PLAIN | Username/password | Development, simple setups |
| SCRAM-SHA-256/512 | Salted challenge response | Production with LDAP |
| GSSAPI | Kerberos | Enterprise environments |
| OAUTHBEARER | OAuth 2.0 tokens | Cloud-native applications |
SASL/SCRAM Setup
# Create SCRAM credentials
kafka-configs.sh --bootstrap-server localhost:9092 \
--alter \
--add-config 'SCRAM-SHA-256=[iterations=8192,password=admin-secret],SCRAM-SHA-512=[iterations=8192,password=admin-secret]' \
--entity-type users --entity-name admin
kafka-configs.sh --bootstrap-server localhost:9092 \
--alter \
--add-config 'SCRAM-SHA-256=[iterations=8192,password=app-secret]' \
--entity-type users --entity-name app-user
SASL Configuration
# server.properties
listeners=SASL_SSL://0.0.0.0:9093
security.inter.broker.protocol=SASL_SSL
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
sasl.enabled.mechanisms=SCRAM-SHA-256,SCRAM-SHA-512
# JAAS configuration
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
username="admin" \
password="admin-secret";
Python Client with SASL
from kafka import KafkaProducer
from kafka import KafkaConsumer
# Producer with SASL
producer = KafkaProducer(
bootstrap_servers=['kafka-broker:9093'],
security_protocol='SASL_SSL',
sasl_mechanism='SCRAM-SHA-256',
sasl_plain_username='app-user',
sasl_plain_password='app-secret',
ssl_cafile='/var/kafka/ssl/ca-cert'
)
# Consumer with SASL
consumer = KafkaConsumer(
'orders',
bootstrap_servers=['kafka-broker:9093'],
security_protocol='SASL_SSL',
sasl_mechanism='SCRAM-SHA-256',
sasl_plain_username='consumer-user',
sasl_plain_password='consumer-secret',
ssl_cafile='/var/kafka/ssl/ca-cert',
group_id='order-processor'
)
ACL Authorization
ACL Structure
ACLs follow the pattern: Principal + Resource + Operation + Permission
# Grant read access to topic for specific user
kafka-acls.sh --bootstrap-server localhost:9093 \
--add \
--allow-principal User:app-user \
--operation Read \
--topic orders \
--command-config admin.properties
# Grant write access to topic
kafka-acls.sh --bootstrap-server localhost:9093 \
--add \
--allow-principal User:app-user \
--operation Write \
--topic orders \
--command-config admin.properties
# List ACLs for a topic
kafka-acls.sh --bootstrap-server localhost:9093 \
--list \
--topic orders \
--command-config admin.properties
Common ACL Patterns
# Producer ACLs
kafka-acls.sh --add --allow-principal User:producer \
--operation Read --topic orders \
--operation Describe --topic orders
# Consumer ACLs
kafka-acls.sh --add --allow-principal User:consumer \
--operation Read --topic orders \
--operation Read --group order-processor
# Admin ACLs
kafka-acls.sh --add --allow-principal User:admin \
--operation All --topic '*' \
--operation All --group '*'
ACL Authorizer Configuration
# server.properties
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
super.users=User:admin
allow.everyone.if.no.acl.found=false
Encryption at Rest
Broker Configuration
# Enable encryption at rest (requires specific storage backend)
log.dirs=/encrypted/kafka-logs
compression.type=lz4
Client-Side Encryption
from cryptography.fernet import Fernet
from kafka import KafkaProducer
import json
class EncryptedProducer:
def __init__(self, bootstrap_servers, encryption_key):
self.producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
self.cipher = Fernet(encryption_key)
def send(self, topic, message, key=None):
encrypted_value = self.cipher.encrypt(json.dumps(message).encode())
self.producer.send(topic, key=key, value=encrypted_value)
def flush(self):
self.producer.flush()
# Usage
key = Fernet.generate_key()
producer = EncryptedProducer(['kafka:9092'], key)
producer.send('sensitive-orders', {'amount': 100, 'user': 'john'})
Security Best Practices
1. Network Security
# Bind to specific interface
listeners=SSL://10.0.1.10:9093
# Restrict listener names
listener.security.protocol.map=INTERNAL:SSL,EXTERNAL:SASL_SSL
inter.broker.listener.name=INTERNAL
2. Credential Management
# Use environment variables for credentials
import os
from kafka import KafkaProducer
producer = KafkaProducer(
bootstrap_servers=os.environ['KAFKA_BOOTSTRAP_SERVERS'],
sasl_plain_username=os.environ['KAFKA_SASL_USERNAME'],
sasl_plain_password=os.environ['KAFKA_SASL_PASSWORD'],
ssl_cafile=os.environ['KAFKA_SSL_CAFILE']
)
3. Regular Credential Rotation
#!/bin/bash
# rotate_scram_credentials.sh
NEW_PASSWORD=$(openssl rand -base64 32)
kafka-configs.sh --bootstrap-server localhost:9093 \
--alter \
--add-config "SCRAM-SHA-256=[password=$NEW_PASSWORD]" \
--entity-type users --entity-name app-user
# Update application configuration
echo "SCRAM_PASSWORD=$NEW_PASSWORD" > /etc/kafka/.env.new
mv /etc/kafka/.env.new /etc/kafka/.env
4. Security Checklist
- SSL/TLS enabled for all listeners
- SASL authentication configured
- ACLs implemented for all topics
- Super users limited to emergency access only
- Credentials stored in secure vault
- Audit logging enabled
- Regular security assessments scheduled
Summary
Kafka security requires a defense-in-depth approach implementing SSL/TLS encryption, SASL authentication, ACL authorization, and comprehensive auditing. Follow security best practices and regularly audit your configuration to maintain a secure event streaming platform.