Kafka Security & ACLs
Difficulty: Senior Level | Companies: LinkedIn, Uber, Netflix, Spotify, Confluent
Content
Kafka security encompasses authentication, authorization, encryption, and audit logging. Understanding these mechanisms is critical for production deployments.
Security Architecture
Architecture Diagram
Security Layers:
βββ Authentication (SASL)
β βββ SASL/PLAIN
β βββ SASL/SCRAM
β βββ SASL/GSSAPI (Kerberos)
β βββ SASL/OAUTHBEARER
βββ Authorization (ACLs)
β βββ Topic-level permissions
β βββ Cluster-level permissions
β βββ Consumer group permissions
βββ Encryption (SSL/TLS)
β βββ In-transit encryption
β βββ Certificate management
βββ Audit Logging
βββ Access logs
βββ Operation logs
SSL/TLS Configuration
Broker Configuration
# server.properties
listeners=SSL://kafka1:9093
advertised.listeners=SSL://kafka1:9093
# SSL settings
ssl.keystore.location=/etc/kafka/secrets/kafka.server.keystore.jks
ssl.keystore.password=${keystore.password}
ssl.key.password=${key.password}
ssl.truststore.location=/etc/kafka/secrets/kafka.server.truststore.jks
ssl.truststore.password=${truststore.password}
# Require client authentication
ssl.client.auth=required
# Enable SSL encryption
security.inter.broker.protocol=SSL
Client Configuration
# Producer properties
bootstrap.servers=kafka1:9093
security.protocol=SSL
ssl.truststore.location=/etc/kafka/client/truststore.jks
ssl.truststore.password=${truststore.password}
ssl.keystore.location=/etc/kafka/client/keystore.jks
ssl.keystore.password=${keystore.password}
Java Client with SSL
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9093");
props.put("security.protocol", "SSL");
props.put("ssl.truststore.location", "/etc/kafka/client/truststore.jks");
props.put("ssl.truststore.password", "changeit");
props.put("ssl.keystore.location", "/etc/kafka/client/keystore.jks");
props.put("ssl.keystore.password", "changeit");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
SASL/SCRAM Authentication
Broker Configuration
# server.properties
listeners=SASL_SSL://kafka1:9094
advertised.listeners=SASL_SSL://kafka1:9094
# SASL configuration
sasl.enabled.mechanisms=SCRAM-SHA-512
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512
security.inter.broker.protocol=SASL_SSL
# JAAS configuration
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
username="admin" \
password="admin-secret";
Create SCRAM Credentials
# Create user
kafka-configs.sh --bootstrap-server kafka1:9094 \
--alter --add-config 'SCRAM-SHA-512=[password=producer-secret]' \
--entity-type users --entity-name producer
# Create consumer user
kafka-configs.sh --bootstrap-server kafka1:9094 \
--alter --add-config 'SCRAM-SHA-512=[password=consumer-secret]' \
--entity-type users --entity-name consumer
Client Configuration
# Producer with SASL/SCRAM
bootstrap.servers=kafka1:9094
security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
username="producer" \
password="producer-secret";
β οΈ
Important: Always use SASL_SSL (not just SASL) in production to encrypt credentials in transit.
ACL Configuration
Enable ACLs
# server.properties
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
super.users=User:admin
allow.everyone.if.no.acl.found=false
ACL Commands
# Allow producer to write to topic
kafka-acls.sh --bootstrap-server kafka1:9093 \
--add --allow-principal User:producer \
--operation Write --topic orders
# Allow consumer to read from topic
kafka-acls.sh --bootstrap-server kafka1:9093 \
--add --allow-principal User:consumer \
--operation Read --topic orders
# Allow consumer group
kafka-acls.sh --bootstrap-server kafka1:9093 \
--add --allow-principal User:consumer \
--operation Read --group order-processor
# Allow admin to create topics
kafka-acls.sh --bootstrap-server kafka1:9093 \
--add --allow-principal User:admin \
--operation Create --topic \* --resource-type topic
# List ACLs
kafka-acls.sh --bootstrap-server kafka1:9093 \
--list --topic orders
# Remove ACL
kafka-acls.sh --bootstrap-server kafka1:9093 \
--remove --allow-principal User:producer \
--operation Write --topic orders
ACL Patterns
# Wildcard patterns
--topic \* # All topics
--topic orders\* # Topics starting with orders
--group \* # All consumer groups
# Resource types
--resource-type topic
--resource-type group
--resource-type cluster
--resource-type transactional-id
# Operations
--operation Read
--operation Write
--operation Create
--operation Delete
--operation Alter
--operation Describe
--operation All
Security Best Practices
1. Principle of Least Privilege
# Producer: only write to specific topic
kafka-acls.sh --add --allow-principal User:producer \
--operation Write --topic orders
# Consumer: only read from specific topic and group
kafka-acls.sh --add --allow-principal User:consumer \
--operation Read --topic orders
kafka-acls.sh --add --allow-principal User:consumer \
--operation Read --group order-processor
# Admin: full access to specific topics
kafka-acls.sh --add --allow-principal User:admin \
--operation All --topic orders --resource-type topic
2. Audit Logging
# server.properties
# Enable audit logging
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
audit.log.enabled=true
# Log file location
log.dirs=/var/log/kafka
3. Network Security
# Restrict listeners
listeners=SSL://0.0.0.0:9093
# Use separate ports for internal/external
internal.listeners=SSL://internal-kafka:9093
external.listeners=SSL://external-kafka:9093
Python Client Security
from kafka import KafkaProducer, KafkaConsumer
# Producer with SSL and SASL
producer = KafkaProducer(
bootstrap_servers=['kafka1:9094'],
security_protocol='SASL_SSL',
sasl_mechanism='SCRAM-SHA-512',
sasl_plain_username='producer',
sasl_plain_password='producer-secret',
ssl_cafile='/etc/kafka/client/ca-cert.pem',
ssl_certfile='/etc/kafka/client/client-cert.pem',
ssl_keyfile='/etc/kafka/client/client-key.pem'
)
# Consumer with SSL and SASL
consumer = KafkaConsumer(
'orders',
bootstrap_servers=['kafka1:9094'],
security_protocol='SASL_SSL',
sasl_mechanism='SCRAM-SHA-512',
sasl_plain_username='consumer',
sasl_plain_password='consumer-secret',
ssl_cafile='/etc/kafka/client/ca-cert.pem',
group_id='order-processor'
)
Follow-Up Questions
- What is the difference between SASL/PLAIN and SASL/SCRAM?
- How do ACLs work with wildcard patterns?
- Explain the purpose of
allow.everyone.if.no.acl.found=false. - How would you rotate SSL certificates without downtime?
- What are the security considerations for multi-tenant Kafka clusters?