πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Kafka Security & ACLs

Apache KafkaSecurity⭐ Premium

Advertisement

Kafka Security & ACLs

Difficulty: Senior Level | Companies: LinkedIn, Uber, Netflix, Spotify, Confluent

Content

Kafka security encompasses authentication, authorization, encryption, and audit logging. Understanding these mechanisms is critical for production deployments.

Security Architecture

Architecture Diagram
Security Layers:
β”œβ”€β”€ Authentication (SASL)
β”‚   β”œβ”€β”€ SASL/PLAIN
β”‚   β”œβ”€β”€ SASL/SCRAM
β”‚   β”œβ”€β”€ SASL/GSSAPI (Kerberos)
β”‚   └── SASL/OAUTHBEARER
β”œβ”€β”€ Authorization (ACLs)
β”‚   β”œβ”€β”€ Topic-level permissions
β”‚   β”œβ”€β”€ Cluster-level permissions
β”‚   └── Consumer group permissions
β”œβ”€β”€ Encryption (SSL/TLS)
β”‚   β”œβ”€β”€ In-transit encryption
β”‚   └── Certificate management
└── Audit Logging
    β”œβ”€β”€ Access logs
    └── Operation logs

SSL/TLS Configuration

Broker Configuration

# server.properties
listeners=SSL://kafka1:9093
advertised.listeners=SSL://kafka1:9093

# SSL settings
ssl.keystore.location=/etc/kafka/secrets/kafka.server.keystore.jks
ssl.keystore.password=${keystore.password}
ssl.key.password=${key.password}
ssl.truststore.location=/etc/kafka/secrets/kafka.server.truststore.jks
ssl.truststore.password=${truststore.password}

# Require client authentication
ssl.client.auth=required

# Enable SSL encryption
security.inter.broker.protocol=SSL

Client Configuration

# Producer properties
bootstrap.servers=kafka1:9093
security.protocol=SSL
ssl.truststore.location=/etc/kafka/client/truststore.jks
ssl.truststore.password=${truststore.password}
ssl.keystore.location=/etc/kafka/client/keystore.jks
ssl.keystore.password=${keystore.password}

Java Client with SSL

Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9093");
props.put("security.protocol", "SSL");
props.put("ssl.truststore.location", "/etc/kafka/client/truststore.jks");
props.put("ssl.truststore.password", "changeit");
props.put("ssl.keystore.location", "/etc/kafka/client/keystore.jks");
props.put("ssl.keystore.password", "changeit");

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

SASL/SCRAM Authentication

Broker Configuration

# server.properties
listeners=SASL_SSL://kafka1:9094
advertised.listeners=SASL_SSL://kafka1:9094

# SASL configuration
sasl.enabled.mechanisms=SCRAM-SHA-512
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512
security.inter.broker.protocol=SASL_SSL

# JAAS configuration
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
  username="admin" \
  password="admin-secret";

Create SCRAM Credentials

# Create user
kafka-configs.sh --bootstrap-server kafka1:9094 \
  --alter --add-config 'SCRAM-SHA-512=[password=producer-secret]' \
  --entity-type users --entity-name producer

# Create consumer user
kafka-configs.sh --bootstrap-server kafka1:9094 \
  --alter --add-config 'SCRAM-SHA-512=[password=consumer-secret]' \
  --entity-type users --entity-name consumer

Client Configuration

# Producer with SASL/SCRAM
bootstrap.servers=kafka1:9094
security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
  username="producer" \
  password="producer-secret";

⚠️

Important: Always use SASL_SSL (not just SASL) in production to encrypt credentials in transit.

ACL Configuration

Enable ACLs

# server.properties
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
super.users=User:admin
allow.everyone.if.no.acl.found=false

ACL Commands

# Allow producer to write to topic
kafka-acls.sh --bootstrap-server kafka1:9093 \
  --add --allow-principal User:producer \
  --operation Write --topic orders

# Allow consumer to read from topic
kafka-acls.sh --bootstrap-server kafka1:9093 \
  --add --allow-principal User:consumer \
  --operation Read --topic orders

# Allow consumer group
kafka-acls.sh --bootstrap-server kafka1:9093 \
  --add --allow-principal User:consumer \
  --operation Read --group order-processor

# Allow admin to create topics
kafka-acls.sh --bootstrap-server kafka1:9093 \
  --add --allow-principal User:admin \
  --operation Create --topic \* --resource-type topic

# List ACLs
kafka-acls.sh --bootstrap-server kafka1:9093 \
  --list --topic orders

# Remove ACL
kafka-acls.sh --bootstrap-server kafka1:9093 \
  --remove --allow-principal User:producer \
  --operation Write --topic orders

ACL Patterns

# Wildcard patterns
--topic \*                    # All topics
--topic orders\*              # Topics starting with orders
--group \*                    # All consumer groups

# Resource types
--resource-type topic
--resource-type group
--resource-type cluster
--resource-type transactional-id

# Operations
--operation Read
--operation Write
--operation Create
--operation Delete
--operation Alter
--operation Describe
--operation All

Security Best Practices

1. Principle of Least Privilege

# Producer: only write to specific topic
kafka-acls.sh --add --allow-principal User:producer \
  --operation Write --topic orders

# Consumer: only read from specific topic and group
kafka-acls.sh --add --allow-principal User:consumer \
  --operation Read --topic orders
kafka-acls.sh --add --allow-principal User:consumer \
  --operation Read --group order-processor

# Admin: full access to specific topics
kafka-acls.sh --add --allow-principal User:admin \
  --operation All --topic orders --resource-type topic

2. Audit Logging

# server.properties
# Enable audit logging
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
audit.log.enabled=true

# Log file location
log.dirs=/var/log/kafka

3. Network Security

# Restrict listeners
listeners=SSL://0.0.0.0:9093

# Use separate ports for internal/external
internal.listeners=SSL://internal-kafka:9093
external.listeners=SSL://external-kafka:9093

Python Client Security

from kafka import KafkaProducer, KafkaConsumer

# Producer with SSL and SASL
producer = KafkaProducer(
    bootstrap_servers=['kafka1:9094'],
    security_protocol='SASL_SSL',
    sasl_mechanism='SCRAM-SHA-512',
    sasl_plain_username='producer',
    sasl_plain_password='producer-secret',
    ssl_cafile='/etc/kafka/client/ca-cert.pem',
    ssl_certfile='/etc/kafka/client/client-cert.pem',
    ssl_keyfile='/etc/kafka/client/client-key.pem'
)

# Consumer with SSL and SASL
consumer = KafkaConsumer(
    'orders',
    bootstrap_servers=['kafka1:9094'],
    security_protocol='SASL_SSL',
    sasl_mechanism='SCRAM-SHA-512',
    sasl_plain_username='consumer',
    sasl_plain_password='consumer-secret',
    ssl_cafile='/etc/kafka/client/ca-cert.pem',
    group_id='order-processor'
)

Follow-Up Questions

  1. What is the difference between SASL/PLAIN and SASL/SCRAM?
  2. How do ACLs work with wildcard patterns?
  3. Explain the purpose of allow.everyone.if.no.acl.found=false.
  4. How would you rotate SSL certificates without downtime?
  5. What are the security considerations for multi-tenant Kafka clusters?

Advertisement