Kafka Security: SASL, SSL, ACLs, Encryption at Rest
Difficulty: Senior | Asked at: Finance, Healthcare, Enterprise companies
ℹ️Interview Context
Security is critical for production Kafka deployments. Interviewers expect you to understand authentication, authorization, encryption, and how to implement defense-in-depth strategies.
The Question
Explain how to secure a Kafka cluster. What are the different authentication mechanisms (SASL)? How do SSL/TLS encryption and ACL authorization work? How do you implement encryption at rest?
Security Architecture Overview
Producer/Broker/Consumer
|
+--> Authentication (SASL)
| Proves identity
|
+--> Encryption (SSL/TLS)
| Protects data in transit
|
+--> Authorization (ACLs)
| Controls what authenticated users can do
|
+--> Encryption at Rest
| Protects data on disk
|
+--> Audit Logging
Records all access
SASL Authentication Mechanisms
SASL/PLAIN
# broker.properties
listeners=SASL_PLAINTEXT://0.0.0.0:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
# JAAS configuration (kafka_server_jaas.conf)
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret"
user_producer="producer-secret"
user_consumer="consumer-secret";
};
# Producer with SASL/PLAIN
from kafka import KafkaProducer
producer = KafkaProducer(
bootstrap_servers='localhost:9092',
security_protocol='SASL_PLAINTEXT',
sasl_mechanism='PLAIN',
sasl_plain_username='producer',
sasl_plain_password='producer-secret'
)
SASL/SCRAM
# broker.properties
listeners=SASL_SSL://0.0.0.0:9093
sasl.enabled.mechanisms=SCRAM-SHA-256
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
# Create SCRAM credentials
# kafka-configs.sh --zookeeper localhost:2181 \
# --alter --add-config 'SCRAM-SHA-256=[password=alice-secret]' \
# --entity-type users --entity-name alice
# Producer with SASL/SCRAM
producer = KafkaProducer(
bootstrap_servers='localhost:9093',
security_protocol='SASL_SSL',
sasl_mechanism='SCRAM-SHA-256',
sasl_plain_username='alice',
sasl_plain_password='alice-secret',
ssl_cafile='/path/to/ca.crt'
)
SASL/GSSAPI (Kerberos)
# broker.properties
listeners=SASL_SSL://0.0.0.0:9093
sasl.enabled.mechanisms=GSSAPI
sasl.mechanism.inter.broker.protocol=GSSAPI
sasl.kerberos.service.name=kafka
# Producer with SASL/GSSAPI
producer = KafkaProducer(
bootstrap_servers='localhost:9093',
security_protocol='SASL_SSL',
sasl_mechanism='GSSAPI',
sasl_kerberos_service_name='kafka',
sasl_kerberos_keytab='/path/to/kafka.keytab',
sasl_kerberos_principal='kafka/hostname@REALM'
)
SASL/OAUTHBEARER
# broker.properties
listeners=SASL_SSL://0.0.0.0:9093
sasl.enabled.mechanisms=OAUTHBEARER
sasl.mechanism.inter.broker.protocol=OAUTHBEARER
# Producer with SASL/OAUTHBEARER
from kafka.sasl.oauth import OAuthTokenProvider
producer = KafkaProducer(
bootstrap_servers='localhost:9093',
security_protocol='SASL_SSL',
sasl_mechanism='OAUTHBEARER',
sasl_oauth_token_provider=OAuthTokenProvider(
token_endpoint='https://auth.example.com/token',
client_id='kafka-client',
client_secret='client-secret',
scope='kafka'
)
)
SASL Mechanism Comparison
| Mechanism | Security | Complexity | Use Case |
|---|---|---|---|
| PLAIN | Low (plaintext) | Low | Development |
| SCRAM-SHA-256 | Medium | Medium | Production (simple) |
| SCRAM-SHA-512 | High | Medium | Production (enhanced) |
| GSSAPI | High | High | Enterprise (Kerberos) |
| OAUTHBEARER | High | High | Cloud/Microservices |
⚠️SASL/PLAIN Warning
SASL/PLAIN sends credentials in plaintext. Always use with SSL/TLS encryption (SASL_SSL listener). Never use SASL_PLAINTEXT in production.
SSL/TLS Encryption
SSL Configuration
# broker.properties
listeners=SSL://0.0.0.0:9093
ssl.keystore.location=/var/kafka/ssl/kafka.server.keystore.jks
ssl.keystore.password=keystore-password
ssl.key.password=key-password
ssl.truststore.location=/var/kafka/ssl/kafka.server.truststore.jks
ssl.truststore.password=truststore-password
ssl.client.auth=required # or requested
ssl.enabled.protocols=TLSv1.2,TLSv1.3
ssl.cipher.suites=TLS_AES_256_GCM_SHA384,TLS_CHACHA20_POLY1305_SHA256
ssl.endpoint.identification.algorithm=https
Certificate Generation
# Generate CA key and certificate
openssl req -new -x509 -keyout ca-key -out ca-cert -days 365 \
-subj "/CN=Kafka CA"
# Generate broker keystore
keytool -keystore kafka.server.keystore.jks \
-alias kafka -validity 365 -genkey \
-keyalg RSA -keysize 2048 \
-dname "CN=kafka.example.com"
# Generate broker certificate signed by CA
openssl x509 -req -CA ca-cert -CAkey ca-key \
-in broker.csr -out broker-cert \
-days 365 -CAcreateserial
# Import CA cert into truststore
keytool -keystore kafka.server.truststore.jks \
-alias CARoot -importcert -file ca-cert
# Import signed broker cert into keystore
keytool -keystore kafka.server.keystore.jks \
-alias kafka -importcert -file broker-cert
SSL Client Configuration
# Producer with SSL
from kafka import KafkaProducer
producer = KafkaProducer(
bootstrap_servers='kafka1:9093',
security_protocol='SSL',
ssl_cafile='/path/to/ca.crt',
ssl_certfile='/path/to/client.crt',
ssl_keyfile='/path/to/client.key',
ssl_check_hostname=True,
ssl_cert_reqs='CERT_REQUIRED'
)
SSL Performance Formula
def estimate_ssl_overhead(
throughput_plain_mbps: float,
ssl_handshake_ms: float,
connections_per_sec: int
) -> dict:
"""
Estimate SSL performance overhead.
"""
# Handshake overhead
handshake_overhead_per_sec = connections_per_sec * ssl_handshake_ms / 1000
# Typical SSL CPU overhead: 10-30%
cpu_overhead_factor = 1.2 # 20% overhead
# Throughput reduction
throughput_ssl = throughput_plain_mbps / cpu_overhead_factor
reduction_percent = (1 - throughput_ssl / throughput_plain_mbps) * 100
return {
'handshake_overhead_per_sec': handshake_overhead_per_sec,
'throughput_plain_mbps': throughput_plain_mbps,
'throughput_ssl_mbps': throughput_ssl,
'throughput_reduction_percent': reduction_percent,
'recommended_batch_size': 'Larger batches amortize SSL overhead'
}
# Example
overhead = estimate_ssl_overhead(
throughput_plain_mbps=500,
ssl_handshake_ms=50,
connections_per_sec=10
)
print(f"Throughput reduction: {overhead['throughput_reduction_percent']:.1f}%")
ℹ️SSL Optimization
Use connection pooling to avoid frequent SSL handshakes. Set connections.max.idle.ms to keep connections alive. Use larger batch sizes to amortize per-message SSL overhead.
ACL Authorization
ACL Configuration
# broker.properties
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
allow.everyone.if.no.acl.found=false
super.users=User:admin
ACL Commands
# Allow producer to write to topic
kafka-acls.sh --bootstrap-server localhost:9093 \
--command-config admin.properties \
--add --allow-principal User:producer \
--operation Write --topic orders
# Allow consumer to read from topic
kafka-acls.sh --bootstrap-server localhost:9093 \
--command-config admin.properties \
--add --allow-principal User:consumer \
--operation Read --topic orders \
--group consumer-group
# Allow admin to manage topics
kafka-acls.sh --bootstrap-server localhost:9093 \
--command-config admin.properties \
--add --allow-principal User:admin \
--operation All --topic '*'
# List ACLs for a topic
kafka-acls.sh --bootstrap-server localhost:9093 \
--command-config admin.properties \
--list --topic orders
ACL Resource Types
# ACL resource types and operations
acl_resources = {
'Topic': {
'operations': ['Read', 'Write', 'Create', 'Delete', 'Alter', 'Describe', 'All'],
'examples': [
'Allow User:X to Write to topic orders',
'Allow Group:Y to Read from topic orders'
]
},
'Group': {
'operations': ['Read', 'Describe', 'All'],
'examples': [
'Allow Group:consumers to Read from group my-group'
]
},
'Cluster': {
'operations': ['Create', 'Alter', 'Describe', 'ClusterAction', 'All'],
'examples': [
'Allow User:admin to Create topics',
'Allow User:admin to Alter configs'
]
},
'TransactionalId': {
'operations': ['Write', 'Describe', 'All'],
'examples': [
'Allow User:producer to use transactional.id my-txn'
]
},
'DelegationToken': {
'operations': ['Describe', 'All'],
'examples': [
'Allow User:app to manage delegationToken'
]
}
}
ACL Evaluation
def evaluate_acl(resource, operation, principal, acls):
"""
Evaluate if a principal has access to a resource.
ACL evaluation order:
1. Super users always have access
2. If no ACLs found and allow.everyone.if.no.acl.found=true, allow
3. Check each ACL:
- Principal must match
- Resource must match
- Operation must match
- Permission must be ALLOW
4. If no matching ALLOW ACL, deny
"""
# Check super users
if principal in super_users:
return True
# Check ACLs
for acl in acls:
if (acl.principal == principal and
acl.resource == resource and
acl.operation == operation and
acl.permission == 'ALLOW'):
return True
# No matching ACL found
return False
# Example ACLs
acls = [
ACL('User:producer', 'Topic:orders', 'Write', 'ALLOW'),
ACL('User:consumer', 'Topic:orders', 'Read', 'ALLOW'),
ACL('User:admin', 'Cluster:*', 'All', 'ALLOW'),
]
# Evaluation
print(evaluate_acl('Topic:orders', 'Write', 'User:producer', acls)) # True
print(evaluate_acl('Topic:orders', 'Read', 'User:producer', acls)) # False
print(evaluate_acl('Cluster:*', 'All', 'User:admin', acls)) # True
⚠️ACL Best Practice
Start with allow.everyone.if.no.acl.found=false and explicitly grant permissions. This denies access by default, which is the secure approach.
Encryption at Rest
Broker-Level Encryption
# Enable log encryption at rest
# Requires Confluent Platform or custom implementation
# Option 1: Filesystem-level encryption (dm-crypt on Linux)
# Encrypt the entire data directory
cryptsetup luksFormat /dev/sdb1
cryptsetup open /dev/sdb1 kafka-data
mkfs.ext4 /dev/mapper/kafka-data
mount /dev/mapper/kafka-data /var/kafka/data
# Option 2: Confluent Server encryption
confluent.security.encryption.key=your-encryption-key
confluent.security.encryption.provider=AES
Application-Level Encryption
from cryptography.fernet import Fernet
import json
class EncryptedProducer:
"""
Encrypt messages before producing to Kafka.
"""
def __init__(self, encryption_key):
self.cipher = Fernet(encryption_key)
self.producer = KafkaProducer(
bootstrap_servers='localhost:9092',
value_serializer=lambda v: v
)
def send_encrypted(self, topic, key, value):
"""Send encrypted message"""
# Encrypt value
encrypted = self.cipher.encrypt(json.dumps(value).encode())
self.producer.send(topic, key=key, value=encrypted)
def send_batch_encrypted(self, topic, messages):
"""Send batch of encrypted messages"""
for key, value in messages:
self.send_encrypted(topic, key, value)
self.producer.flush()
class EncryptedConsumer:
"""
Consumer that decrypts messages from Kafka.
"""
def __init__(self, encryption_key):
self.cipher = Fernet(encryption_key)
self.consumer = KafkaConsumer(
'encrypted-topic',
value_deserializer=lambda v: v
)
def consume_decrypted(self):
"""Consume and decrypt messages"""
for message in self.consumer:
# Decrypt value
decrypted = self.cipher.decrypt(message.value)
value = json.loads(decrypted.decode())
yield message.key, value
# Generate encryption key
key = Fernet.generate_key()
# Store key securely (Vault, KMS, etc.)
Field-Level Encryption
class FieldLevelEncryption:
"""
Encrypt specific fields in messages.
"""
def __init__(self, key_registry):
self.key_registry = key_registry
def encrypt_fields(self, message, fields_to_encrypt):
"""
Encrypt specific fields in message.
Useful for PII: SSN, email, phone, etc.
"""
encrypted = message.copy()
for field in fields_to_encrypt:
if field in encrypted:
key = self.key_registry.get_key(field)
cipher = Fernet(key)
encrypted[field] = cipher.encrypt(
str(encrypted[field]).encode()
).decode()
return encrypted
def decrypt_fields(self, message, fields_to_decrypt):
"""Decrypt specific fields"""
decrypted = message.copy()
for field in fields_to_decrypt:
if field in decrypted:
key = self.key_registry.get_key(field)
cipher = Fernet(key)
decrypted[field] = cipher.decrypt(
decrypted[field].encode()
).decode()
return decrypted
# Example usage
encryption = FieldLevelEncryption(key_registry)
# Encrypt before producing
message = {
'user_id': '12345',
'ssn': '123-45-6789',
'email': 'user@example.com',
'amount': 100.00
}
encrypted_msg = encryption.encrypt_fields(message, ['ssn', 'email'])
# ssn and email are encrypted, user_id and amount are plaintext
producer.send('user-events', value=encrypted_msg)
# Consumer decrypts
decrypted_msg = encryption.decrypt_fields(received_msg, ['ssn', 'email'])
ℹ️Encryption Strategy
- Transport: Use SSL/TLS for all data in transit
- Disk: Use filesystem encryption for data at rest
- Field-level: Encrypt sensitive fields (PII) at application level
- Key management: Use a KMS (AWS KMS, HashiCorp Vault) for key rotation
Key Management
class KeyManagementService:
"""
Key management for Kafka encryption.
"""
def __init__(self, kms_client):
self.kms = kms_client
self.key_cache = {}
def get_encryption_key(self, key_id):
"""
Get encryption key from KMS.
Cache keys to avoid KMS calls on every message.
"""
if key_id not in self.key_cache:
# Fetch from KMS
response = self.kms.decrypt(
CiphertextBlob=self._fetch_wrapped_key(key_id)
)
self.key_cache[key_id] = Fernet(response['Plaintext'])
return self.key_cache[key_id]
def rotate_key(self, key_id):
"""
Rotate encryption key.
Old messages still encrypted with old key.
New messages use new key.
"""
# Generate new key
new_key = Fernet.generate_key()
# Store in KMS
self.kms.encrypt(
KeyId=key_id,
Plaintext=new_key
)
# Invalidate cache
if key_id in self.key_cache:
del self.key_cache[key_id]
def decrypt_with_key_rotation(self, encrypted_data, key_id, old_key_id=None):
"""
Handle decryption during key rotation.
Try new key first, fall back to old key.
"""
try:
key = self.get_encryption_key(key_id)
return key.decrypt(encrypted_data)
except Exception:
if old_key_id:
old_key = self.get_encryption_key(old_key_id)
return old_key.decrypt(encrypted_data)
raise
Audit Logging
class AuditLogger:
"""
Audit logging for Kafka operations.
"""
def __init__(self, audit_topic):
self.producer = KafkaProducer(
bootstrap_servers='localhost:9092',
value_serializer=lambda v: json.dumps(v).encode()
)
self.audit_topic = audit_topic
def log_operation(self, user, operation, resource, result):
"""Log a security-relevant operation"""
audit_record = {
'timestamp': int(time.time() * 1000),
'user': user,
'operation': operation,
'resource': resource,
'result': result,
'source_ip': self._get_client_ip(),
'user_agent': self._get_user_agent()
}
self.producer.send(self.audit_topic, value=audit_record)
def log_authentication(self, user, method, success, failure_reason=None):
"""Log authentication attempt"""
self.log_operation(
user=user,
operation='AUTHENTICATE',
resource='broker',
result={
'success': success,
'method': method,
'failure_reason': failure_reason
}
)
def log_authorization(self, user, operation, resource, granted):
"""Log authorization check"""
self.log_operation(
user=user,
operation=f'AUTHORIZE:{operation}',
resource=resource,
result={'granted': granted}
)
⚠️Audit Requirements
Many compliance frameworks (PCI DSS, HIPAA, SOC 2) require audit logging. Log all authentication attempts, authorization decisions, and administrative operations. Retain logs for at least 1 year.
Security Configuration Matrix
| Component | Configuration | Purpose |
|---|---|---|
| Broker | listeners=SASL_SSL | Authentication + Encryption |
| Broker | authorizer.class.name=AclAuthorizer | Authorization |
| Broker | ssl.client.auth=required | Mutual TLS |
| Producer | security_protocol=SASL_SSL | Client auth + encryption |
| Consumer | security_protocol=SASL_SSL | Client auth + encryption |
| Admin | command-config=admin.properties | Admin credentials |
Security Best Practices
# Production security checklist
security_checklist = {
'Authentication': [
'Use SASL/SCRAM or SASL/GSSAPI (not PLAIN)',
'Rotate credentials regularly',
'Use separate credentials per application',
'Enable mutual TLS (ssl.client.auth=required)'
],
'Encryption': [
'Enable SSL/TLS for all listeners',
'Use TLS 1.2 or higher',
'Enable encryption at rest for sensitive data',
'Use field-level encryption for PII'
],
'Authorization': [
'Set allow.everyone.if.no.acl.found=false',
'Grant minimal required permissions',
'Use topic-level ACLs, not wildcards',
'Regularly audit ACLs'
],
'Monitoring': [
'Enable audit logging',
'Monitor failed authentication attempts',
'Alert on unauthorized access attempts',
'Track permission changes'
]
}
ℹ️Defense in Depth
Never rely on a single security layer. Combine authentication, authorization, encryption, and auditing. If one layer is compromised, the others provide protection.