🎉 75% of content is free forever — Unlock Premium from $10/mo →
CW
Search courses…
💼 Servicesℹ️ About✉️ ContactView Pricing Plansfrom $10

Kafka Security: SASL, SSL, ACLs, Encryption at Rest

Apache KafkaSecurity⭐ Premium

Advertisement

Kafka Security: SASL, SSL, ACLs, Encryption at Rest

Difficulty: Senior | Asked at: Finance, Healthcare, Enterprise companies

ℹ️Interview Context

Security is critical for production Kafka deployments. Interviewers expect you to understand authentication, authorization, encryption, and how to implement defense-in-depth strategies.

The Question

Explain how to secure a Kafka cluster. What are the different authentication mechanisms (SASL)? How do SSL/TLS encryption and ACL authorization work? How do you implement encryption at rest?

Security Architecture Overview

Architecture Diagram
Producer/Broker/Consumer
     |
     +--> Authentication (SASL)
     |    Proves identity
     |
     +--> Encryption (SSL/TLS)
     |    Protects data in transit
     |
     +--> Authorization (ACLs)
     |    Controls what authenticated users can do
     |
     +--> Encryption at Rest
     |    Protects data on disk
     |
     +--> Audit Logging
          Records all access

SASL Authentication Mechanisms

SASL/PLAIN

# broker.properties
listeners=SASL_PLAINTEXT://0.0.0.0:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN

# JAAS configuration (kafka_server_jaas.conf)
KafkaServer {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="admin-secret"
    user_producer="producer-secret"
    user_consumer="consumer-secret";
};
# Producer with SASL/PLAIN
from kafka import KafkaProducer

producer = KafkaProducer(
    bootstrap_servers='localhost:9092',
    security_protocol='SASL_PLAINTEXT',
    sasl_mechanism='PLAIN',
    sasl_plain_username='producer',
    sasl_plain_password='producer-secret'
)

SASL/SCRAM

# broker.properties
listeners=SASL_SSL://0.0.0.0:9093
sasl.enabled.mechanisms=SCRAM-SHA-256
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256

# Create SCRAM credentials
# kafka-configs.sh --zookeeper localhost:2181 \
#   --alter --add-config 'SCRAM-SHA-256=[password=alice-secret]' \
#   --entity-type users --entity-name alice
# Producer with SASL/SCRAM
producer = KafkaProducer(
    bootstrap_servers='localhost:9093',
    security_protocol='SASL_SSL',
    sasl_mechanism='SCRAM-SHA-256',
    sasl_plain_username='alice',
    sasl_plain_password='alice-secret',
    ssl_cafile='/path/to/ca.crt'
)

SASL/GSSAPI (Kerberos)

# broker.properties
listeners=SASL_SSL://0.0.0.0:9093
sasl.enabled.mechanisms=GSSAPI
sasl.mechanism.inter.broker.protocol=GSSAPI
sasl.kerberos.service.name=kafka
# Producer with SASL/GSSAPI
producer = KafkaProducer(
    bootstrap_servers='localhost:9093',
    security_protocol='SASL_SSL',
    sasl_mechanism='GSSAPI',
    sasl_kerberos_service_name='kafka',
    sasl_kerberos_keytab='/path/to/kafka.keytab',
    sasl_kerberos_principal='kafka/hostname@REALM'
)

SASL/OAUTHBEARER

# broker.properties
listeners=SASL_SSL://0.0.0.0:9093
sasl.enabled.mechanisms=OAUTHBEARER
sasl.mechanism.inter.broker.protocol=OAUTHBEARER
# Producer with SASL/OAUTHBEARER
from kafka.sasl.oauth import OAuthTokenProvider

producer = KafkaProducer(
    bootstrap_servers='localhost:9093',
    security_protocol='SASL_SSL',
    sasl_mechanism='OAUTHBEARER',
    sasl_oauth_token_provider=OAuthTokenProvider(
        token_endpoint='https://auth.example.com/token',
        client_id='kafka-client',
        client_secret='client-secret',
        scope='kafka'
    )
)

SASL Mechanism Comparison

MechanismSecurityComplexityUse Case
PLAINLow (plaintext)LowDevelopment
SCRAM-SHA-256MediumMediumProduction (simple)
SCRAM-SHA-512HighMediumProduction (enhanced)
GSSAPIHighHighEnterprise (Kerberos)
OAUTHBEARERHighHighCloud/Microservices

⚠️SASL/PLAIN Warning

SASL/PLAIN sends credentials in plaintext. Always use with SSL/TLS encryption (SASL_SSL listener). Never use SASL_PLAINTEXT in production.

SSL/TLS Encryption

SSL Configuration

# broker.properties
listeners=SSL://0.0.0.0:9093
ssl.keystore.location=/var/kafka/ssl/kafka.server.keystore.jks
ssl.keystore.password=keystore-password
ssl.key.password=key-password
ssl.truststore.location=/var/kafka/ssl/kafka.server.truststore.jks
ssl.truststore.password=truststore-password
ssl.client.auth=required  # or requested
ssl.enabled.protocols=TLSv1.2,TLSv1.3
ssl.cipher.suites=TLS_AES_256_GCM_SHA384,TLS_CHACHA20_POLY1305_SHA256
ssl.endpoint.identification.algorithm=https

Certificate Generation

# Generate CA key and certificate
openssl req -new -x509 -keyout ca-key -out ca-cert -days 365 \
  -subj "/CN=Kafka CA"

# Generate broker keystore
keytool -keystore kafka.server.keystore.jks \
  -alias kafka -validity 365 -genkey \
  -keyalg RSA -keysize 2048 \
  -dname "CN=kafka.example.com"

# Generate broker certificate signed by CA
openssl x509 -req -CA ca-cert -CAkey ca-key \
  -in broker.csr -out broker-cert \
  -days 365 -CAcreateserial

# Import CA cert into truststore
keytool -keystore kafka.server.truststore.jks \
  -alias CARoot -importcert -file ca-cert

# Import signed broker cert into keystore
keytool -keystore kafka.server.keystore.jks \
  -alias kafka -importcert -file broker-cert

SSL Client Configuration

# Producer with SSL
from kafka import KafkaProducer

producer = KafkaProducer(
    bootstrap_servers='kafka1:9093',
    security_protocol='SSL',
    ssl_cafile='/path/to/ca.crt',
    ssl_certfile='/path/to/client.crt',
    ssl_keyfile='/path/to/client.key',
    ssl_check_hostname=True,
    ssl_cert_reqs='CERT_REQUIRED'
)

SSL Performance Formula

SSL Overhead=Handshake Latency+Encryption CPU Cost\text{SSL Overhead} = \text{Handshake Latency} + \text{Encryption CPU Cost}
Throughput Impact=ThroughputplainThroughputssl×100%\text{Throughput Impact} = \frac{\text{Throughput}_{plain}}{\text{Throughput}_{ssl}} \times 100\%
def estimate_ssl_overhead(
    throughput_plain_mbps: float,
    ssl_handshake_ms: float,
    connections_per_sec: int
) -> dict:
    """
    Estimate SSL performance overhead.
    """
    # Handshake overhead
    handshake_overhead_per_sec = connections_per_sec * ssl_handshake_ms / 1000
    
    # Typical SSL CPU overhead: 10-30%
    cpu_overhead_factor = 1.2  # 20% overhead
    
    # Throughput reduction
    throughput_ssl = throughput_plain_mbps / cpu_overhead_factor
    reduction_percent = (1 - throughput_ssl / throughput_plain_mbps) * 100
    
    return {
        'handshake_overhead_per_sec': handshake_overhead_per_sec,
        'throughput_plain_mbps': throughput_plain_mbps,
        'throughput_ssl_mbps': throughput_ssl,
        'throughput_reduction_percent': reduction_percent,
        'recommended_batch_size': 'Larger batches amortize SSL overhead'
    }

# Example
overhead = estimate_ssl_overhead(
    throughput_plain_mbps=500,
    ssl_handshake_ms=50,
    connections_per_sec=10
)
print(f"Throughput reduction: {overhead['throughput_reduction_percent']:.1f}%")

ℹ️SSL Optimization

Use connection pooling to avoid frequent SSL handshakes. Set connections.max.idle.ms to keep connections alive. Use larger batch sizes to amortize per-message SSL overhead.

ACL Authorization

ACL Configuration

# broker.properties
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
allow.everyone.if.no.acl.found=false
super.users=User:admin

ACL Commands

# Allow producer to write to topic
kafka-acls.sh --bootstrap-server localhost:9093 \
  --command-config admin.properties \
  --add --allow-principal User:producer \
  --operation Write --topic orders

# Allow consumer to read from topic
kafka-acls.sh --bootstrap-server localhost:9093 \
  --command-config admin.properties \
  --add --allow-principal User:consumer \
  --operation Read --topic orders \
  --group consumer-group

# Allow admin to manage topics
kafka-acls.sh --bootstrap-server localhost:9093 \
  --command-config admin.properties \
  --add --allow-principal User:admin \
  --operation All --topic '*'

# List ACLs for a topic
kafka-acls.sh --bootstrap-server localhost:9093 \
  --command-config admin.properties \
  --list --topic orders

ACL Resource Types

# ACL resource types and operations
acl_resources = {
    'Topic': {
        'operations': ['Read', 'Write', 'Create', 'Delete', 'Alter', 'Describe', 'All'],
        'examples': [
            'Allow User:X to Write to topic orders',
            'Allow Group:Y to Read from topic orders'
        ]
    },
    'Group': {
        'operations': ['Read', 'Describe', 'All'],
        'examples': [
            'Allow Group:consumers to Read from group my-group'
        ]
    },
    'Cluster': {
        'operations': ['Create', 'Alter', 'Describe', 'ClusterAction', 'All'],
        'examples': [
            'Allow User:admin to Create topics',
            'Allow User:admin to Alter configs'
        ]
    },
    'TransactionalId': {
        'operations': ['Write', 'Describe', 'All'],
        'examples': [
            'Allow User:producer to use transactional.id my-txn'
        ]
    },
    'DelegationToken': {
        'operations': ['Describe', 'All'],
        'examples': [
            'Allow User:app to manage delegationToken'
        ]
    }
}

ACL Evaluation

def evaluate_acl(resource, operation, principal, acls):
    """
    Evaluate if a principal has access to a resource.
    
    ACL evaluation order:
    1. Super users always have access
    2. If no ACLs found and allow.everyone.if.no.acl.found=true, allow
    3. Check each ACL:
       - Principal must match
       - Resource must match
       - Operation must match
       - Permission must be ALLOW
    4. If no matching ALLOW ACL, deny
    """
    # Check super users
    if principal in super_users:
        return True
    
    # Check ACLs
    for acl in acls:
        if (acl.principal == principal and
            acl.resource == resource and
            acl.operation == operation and
            acl.permission == 'ALLOW'):
            return True
    
    # No matching ACL found
    return False

# Example ACLs
acls = [
    ACL('User:producer', 'Topic:orders', 'Write', 'ALLOW'),
    ACL('User:consumer', 'Topic:orders', 'Read', 'ALLOW'),
    ACL('User:admin', 'Cluster:*', 'All', 'ALLOW'),
]

# Evaluation
print(evaluate_acl('Topic:orders', 'Write', 'User:producer', acls))  # True
print(evaluate_acl('Topic:orders', 'Read', 'User:producer', acls))   # False
print(evaluate_acl('Cluster:*', 'All', 'User:admin', acls))          # True

⚠️ACL Best Practice

Start with allow.everyone.if.no.acl.found=false and explicitly grant permissions. This denies access by default, which is the secure approach.

Encryption at Rest

Broker-Level Encryption

# Enable log encryption at rest
# Requires Confluent Platform or custom implementation

# Option 1: Filesystem-level encryption (dm-crypt on Linux)
# Encrypt the entire data directory
cryptsetup luksFormat /dev/sdb1
cryptsetup open /dev/sdb1 kafka-data
mkfs.ext4 /dev/mapper/kafka-data
mount /dev/mapper/kafka-data /var/kafka/data

# Option 2: Confluent Server encryption
confluent.security.encryption.key=your-encryption-key
confluent.security.encryption.provider=AES

Application-Level Encryption

from cryptography.fernet import Fernet
import json

class EncryptedProducer:
    """
    Encrypt messages before producing to Kafka.
    """
    
    def __init__(self, encryption_key):
        self.cipher = Fernet(encryption_key)
        self.producer = KafkaProducer(
            bootstrap_servers='localhost:9092',
            value_serializer=lambda v: v
        )
    
    def send_encrypted(self, topic, key, value):
        """Send encrypted message"""
        # Encrypt value
        encrypted = self.cipher.encrypt(json.dumps(value).encode())
        
        self.producer.send(topic, key=key, value=encrypted)
    
    def send_batch_encrypted(self, topic, messages):
        """Send batch of encrypted messages"""
        for key, value in messages:
            self.send_encrypted(topic, key, value)
        self.producer.flush()

class EncryptedConsumer:
    """
    Consumer that decrypts messages from Kafka.
    """
    
    def __init__(self, encryption_key):
        self.cipher = Fernet(encryption_key)
        self.consumer = KafkaConsumer(
            'encrypted-topic',
            value_deserializer=lambda v: v
        )
    
    def consume_decrypted(self):
        """Consume and decrypt messages"""
        for message in self.consumer:
            # Decrypt value
            decrypted = self.cipher.decrypt(message.value)
            value = json.loads(decrypted.decode())
            yield message.key, value

# Generate encryption key
key = Fernet.generate_key()
# Store key securely (Vault, KMS, etc.)

Field-Level Encryption

class FieldLevelEncryption:
    """
    Encrypt specific fields in messages.
    """
    
    def __init__(self, key_registry):
        self.key_registry = key_registry
    
    def encrypt_fields(self, message, fields_to_encrypt):
        """
        Encrypt specific fields in message.
        
        Useful for PII: SSN, email, phone, etc.
        """
        encrypted = message.copy()
        
        for field in fields_to_encrypt:
            if field in encrypted:
                key = self.key_registry.get_key(field)
                cipher = Fernet(key)
                encrypted[field] = cipher.encrypt(
                    str(encrypted[field]).encode()
                ).decode()
        
        return encrypted
    
    def decrypt_fields(self, message, fields_to_decrypt):
        """Decrypt specific fields"""
        decrypted = message.copy()
        
        for field in fields_to_decrypt:
            if field in decrypted:
                key = self.key_registry.get_key(field)
                cipher = Fernet(key)
                decrypted[field] = cipher.decrypt(
                    decrypted[field].encode()
                ).decode()
        
        return decrypted

# Example usage
encryption = FieldLevelEncryption(key_registry)

# Encrypt before producing
message = {
    'user_id': '12345',
    'ssn': '123-45-6789',
    'email': 'user@example.com',
    'amount': 100.00
}

encrypted_msg = encryption.encrypt_fields(message, ['ssn', 'email'])
# ssn and email are encrypted, user_id and amount are plaintext

producer.send('user-events', value=encrypted_msg)

# Consumer decrypts
decrypted_msg = encryption.decrypt_fields(received_msg, ['ssn', 'email'])

ℹ️Encryption Strategy

  • Transport: Use SSL/TLS for all data in transit
  • Disk: Use filesystem encryption for data at rest
  • Field-level: Encrypt sensitive fields (PII) at application level
  • Key management: Use a KMS (AWS KMS, HashiCorp Vault) for key rotation

Key Management

class KeyManagementService:
    """
    Key management for Kafka encryption.
    """
    
    def __init__(self, kms_client):
        self.kms = kms_client
        self.key_cache = {}
    
    def get_encryption_key(self, key_id):
        """
        Get encryption key from KMS.
        
        Cache keys to avoid KMS calls on every message.
        """
        if key_id not in self.key_cache:
            # Fetch from KMS
            response = self.kms.decrypt(
                CiphertextBlob=self._fetch_wrapped_key(key_id)
            )
            self.key_cache[key_id] = Fernet(response['Plaintext'])
        
        return self.key_cache[key_id]
    
    def rotate_key(self, key_id):
        """
        Rotate encryption key.
        
        Old messages still encrypted with old key.
        New messages use new key.
        """
        # Generate new key
        new_key = Fernet.generate_key()
        
        # Store in KMS
        self.kms.encrypt(
            KeyId=key_id,
            Plaintext=new_key
        )
        
        # Invalidate cache
        if key_id in self.key_cache:
            del self.key_cache[key_id]
    
    def decrypt_with_key_rotation(self, encrypted_data, key_id, old_key_id=None):
        """
        Handle decryption during key rotation.
        
        Try new key first, fall back to old key.
        """
        try:
            key = self.get_encryption_key(key_id)
            return key.decrypt(encrypted_data)
        except Exception:
            if old_key_id:
                old_key = self.get_encryption_key(old_key_id)
                return old_key.decrypt(encrypted_data)
            raise

Audit Logging

class AuditLogger:
    """
    Audit logging for Kafka operations.
    """
    
    def __init__(self, audit_topic):
        self.producer = KafkaProducer(
            bootstrap_servers='localhost:9092',
            value_serializer=lambda v: json.dumps(v).encode()
        )
        self.audit_topic = audit_topic
    
    def log_operation(self, user, operation, resource, result):
        """Log a security-relevant operation"""
        audit_record = {
            'timestamp': int(time.time() * 1000),
            'user': user,
            'operation': operation,
            'resource': resource,
            'result': result,
            'source_ip': self._get_client_ip(),
            'user_agent': self._get_user_agent()
        }
        
        self.producer.send(self.audit_topic, value=audit_record)
    
    def log_authentication(self, user, method, success, failure_reason=None):
        """Log authentication attempt"""
        self.log_operation(
            user=user,
            operation='AUTHENTICATE',
            resource='broker',
            result={
                'success': success,
                'method': method,
                'failure_reason': failure_reason
            }
        )
    
    def log_authorization(self, user, operation, resource, granted):
        """Log authorization check"""
        self.log_operation(
            user=user,
            operation=f'AUTHORIZE:{operation}',
            resource=resource,
            result={'granted': granted}
        )

⚠️Audit Requirements

Many compliance frameworks (PCI DSS, HIPAA, SOC 2) require audit logging. Log all authentication attempts, authorization decisions, and administrative operations. Retain logs for at least 1 year.

Security Configuration Matrix

ComponentConfigurationPurpose
Brokerlisteners=SASL_SSLAuthentication + Encryption
Brokerauthorizer.class.name=AclAuthorizerAuthorization
Brokerssl.client.auth=requiredMutual TLS
Producersecurity_protocol=SASL_SSLClient auth + encryption
Consumersecurity_protocol=SASL_SSLClient auth + encryption
Admincommand-config=admin.propertiesAdmin credentials

Security Best Practices

# Production security checklist
security_checklist = {
    'Authentication': [
        'Use SASL/SCRAM or SASL/GSSAPI (not PLAIN)',
        'Rotate credentials regularly',
        'Use separate credentials per application',
        'Enable mutual TLS (ssl.client.auth=required)'
    ],
    'Encryption': [
        'Enable SSL/TLS for all listeners',
        'Use TLS 1.2 or higher',
        'Enable encryption at rest for sensitive data',
        'Use field-level encryption for PII'
    ],
    'Authorization': [
        'Set allow.everyone.if.no.acl.found=false',
        'Grant minimal required permissions',
        'Use topic-level ACLs, not wildcards',
        'Regularly audit ACLs'
    ],
    'Monitoring': [
        'Enable audit logging',
        'Monitor failed authentication attempts',
        'Alert on unauthorized access attempts',
        'Track permission changes'
    ]
}

ℹ️Defense in Depth

Never rely on a single security layer. Combine authentication, authorization, encryption, and auditing. If one layer is compromised, the others provide protection.

Advertisement