πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Kafka Data Governance: Retention, Tiered Storage, Audit

Apache KafkaData Governance⭐ Premium

Advertisement

Kafka Data Governance: Retention, Tiered Storage, Audit

Difficulty: Senior | Asked at: Finance, Healthcare, Enterprise companies

ℹ️Interview Context

Data governance is critical for regulated industries. Interviewers expect you to understand retention strategies, tiered storage, audit logging, and compliance requirements.

The Question

How do you implement data governance in Kafka? Explain retention policies, tiered storage, and audit logging strategies for compliance.

Data Governance Framework

Architecture Diagram
Kafka Data Governance Components:
1. Data Classification - What data do we have?
2. Retention Policies - How long to keep data?
3. Access Control - Who can access data?
4. Encryption - Protect sensitive data
5. Audit Logging - Track all access
6. Compliance - Meet regulatory requirements

Retention Policies

Retention Configuration

# Topic-level retention configuration
retention_configs = {
    # Time-based retention
    'retention.ms': 604800000,  # 7 days
    
    # Size-based retention
    'retention.bytes': 1073741824,  # 1GB per partition
    
    # Delete vs compact
    'cleanup.policy': 'delete',  # or 'compact' or 'compact,delete'
    
    # Segment configuration
    'segment.ms': 604800000,  # 7 days
    'segment.bytes': 1073741824,  # 1GB
    
    # Minimum cleanable dirty ratio
    'min.cleanable.dirty.ratio': 0.5
}

# Create topic with retention
from kafka.admin import KafkaAdminClient, NewTopic

admin = KafkaAdminClient(bootstrap_servers='localhost:9092')

topic = NewTopic(
    name='user-events',
    num_partitions=12,
    replication_factor=3,
    topic_configs={
        'retention.ms': 2592000000,  # 30 days
        'retention.bytes': -1,  # Unlimited size
        'cleanup.policy': 'delete'
    }
)

admin.create_topics([topic])

Retention Strategies

retention_strategies = {
    'time_based': {
        'description': 'Delete data after fixed time period',
        'config': 'retention.ms',
        'use_case': 'Log data, events with time relevance',
        'example': 'retention.ms=604800000 (7 days)'
    },
    'size_based': {
        'description': 'Delete oldest data when size exceeds limit',
        'config': 'retention.bytes',
        'use_case': 'Storage-constrained environments',
        'example': 'retention.bytes=1073741824 (1GB per partition)'
    },
    'compact': {
        'description': 'Keep only latest value per key',
        'config': 'cleanup.policy=compact',
        'use_case': 'State stores, configuration topics',
        'example': 'cleanup.policy=compact'
    },
    'compact_delete': {
        'description': 'Compact with tombstone deletion',
        'config': 'cleanup.policy=compact,delete',
        'use_case': 'State stores with deletion',
        'example': 'cleanup.policy=compact,delete'
    }
}

def calculate_retention_size(
    messages_per_day: int,
    avg_message_size: int,
    retention_days: int,
    num_partitions: int
) -> dict:
    """
    Calculate storage requirements for retention policy.
    """
    # Daily data per partition
    daily_per_partition = (messages_per_day * avg_message_size) / num_partitions
    
    # Total per partition
    total_per_partition = daily_per_partition * retention_days
    
    # Total across partitions
    total_bytes = total_per_partition * num_partitions
    
    return {
        'daily_per_partition_gb': daily_per_partition / (1024**3),
        'total_per_partition_gb': total_per_partition / (1024**3),
        'total_gb': total_bytes / (1024**3),
        'total_tb': total_bytes / (1024**4)
    }

# Example
retention = calculate_retention_size(
    messages_per_day=100000000,
    avg_message_size=500,
    retention_days=30,
    num_partitions=12
)
print(f"Total storage: {retention['total_tb']:.2f} TB")

⚠️Retention Warning

Setting retention too short may lose valuable data. Setting too long increases storage costs. Always consider:

  1. Business requirements
  2. Regulatory compliance
  3. Storage costs
  4. Query patterns

Tiered Storage

How Tiered Storage Works

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚              Tiered Storage Architecture                  β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                         β”‚
β”‚  Hot Tier (Local Disk)                                  β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚  Recent data (last 7 days)                     β”‚   β”‚
β”‚  β”‚  - Fast access                                 β”‚   β”‚
β”‚  β”‚  - Full read/write performance                 β”‚   β”‚
β”‚  β”‚  - Expensive storage                           β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                          β”‚                               β”‚
β”‚                          ↓ After retention period        β”‚
β”‚  Warm Tier (S3/GCS/Azure Blob)                         β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚  Older data (7-90 days)                        β”‚   β”‚
β”‚  β”‚  - Slower access                               β”‚   β”‚
β”‚  β”‚  - Read-only (mostly)                          β”‚   β”‚
β”‚  β”‚  - Medium cost                                 β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                          β”‚                               β”‚
β”‚                          ↓ After extended period         β”‚
β”‚  Cold Tier (Glacier/Archive)                            β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚  Archived data (90+ days)                      β”‚   β”‚
β”‚  β”‚  - Very slow access                            β”‚   β”‚
β”‚  β”‚  - Read-only                                   β”‚   β”‚
β”‚  β”‚  - Cheapest storage                            β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Tiered Storage Configuration

# Confluent Tiered Storage configuration
tiered_storage_config = {
    # Enable tiered storage
    'confluent.tiered.enable': True,
    
    # Topic-level configuration
    'confluent.tiered.topic.level.enable': True,
    
    # Storage policies
    'confluent.tiered.archiver.max.bytes.per.segment': 536870912,  # 512MB
    'confluent.tiered.cleanup.policy': 'delete',
    
    # Remote storage settings
    'confluent.tiered.remote.storage.manager.class.name': 'io.confluent.server.redpanda.S3RemoteResourceManager',
    'confluent.tiered.remote.storage.manager.s3.bucket.name': 'kafka-tiered-storage',
    'confluent.tiered.remote.storage.manager.s3.region': 'us-east-1',
    
    # Hot tier retention
    'confluent.tiered.local.retention.ms': 604800000,  # 7 days
    
    # Performance tuning
    'confluent.tiered.fetch.request.timeout.ms': 30000,
    'confluent.tiered.segment.upload.timeout.ms': 60000
}

# Create topic with tiered storage
topic = NewTopic(
    name='events-tiered',
    num_partitions=12,
    replication_factor=3,
    topic_configs={
        'retention.ms': 31536000000,  # 1 year total
        'confluent.tiered.enable': True,
        'confluent.tiered.local.retention.ms': 604800000,  # 7 days hot
        'confluent.tiered.archiver.max.bytes.per.segment': 536870912
    }
)

Tiered Storage Cost Savings

def calculate_tiered_storage_savings(
    total_data_tb: float,
    hot_tier_days: int,
    total_retention_days: int,
    hot_storage_cost_per_gb: float,
    warm_storage_cost_per_gb: float,
    cold_storage_cost_per_gb: float
) -> dict:
    """
    Calculate cost savings from tiered storage.
    """
    # Calculate data distribution
    hot_fraction = hot_tier_days / total_retention_days
    warm_fraction = 0.5 * (1 - hot_fraction)  # Half of remaining in warm
    cold_fraction = 1 - hot_fraction - warm_fraction
    
    # Calculate storage in each tier
    hot_tb = total_data_tb * hot_fraction
    warm_tb = total_data_tb * warm_fraction
    cold_tb = total_data_tb * cold_fraction
    
    # Calculate costs
    hot_cost = hot_tb * 1024 * hot_storage_cost_per_gb
    warm_cost = warm_tb * 1024 * warm_storage_cost_per_gb
    cold_cost = cold_tb * 1024 * cold_storage_cost_per_gb
    
    # Total tiered cost
    tiered_cost = hot_cost + warm_cost + cold_cost
    
    # Traditional cost (all hot)
    traditional_cost = total_data_tb * 1024 * hot_storage_cost_per_gb
    
    # Savings
    savings = traditional_cost - tiered_cost
    savings_percent = (savings / traditional_cost) * 100
    
    return {
        'hot_storage_tb': hot_tb,
        'warm_storage_tb': warm_tb,
        'cold_storage_tb': cold_tb,
        'traditional_monthly_cost': traditional_cost,
        'tiered_monthly_cost': tiered_cost,
        'monthly_savings': savings,
        'savings_percent': savings_percent
    }

# Example
savings = calculate_tiered_storage_savings(
    total_data_tb=10,
    hot_tier_days=7,
    total_retention_days=365,
    hot_storage_cost_per_gb=0.10,
    warm_storage_cost_per_gb=0.025,
    cold_storage_cost_per_gb=0.004
)
print(f"Traditional cost: ${savings['traditional_monthly_cost']:,.0f}/month")
print(f"Tiered cost: ${savings['tiered_monthly_cost']:,.0f}/month")
print(f"Savings: ${savings['monthly_savings']:,.0f}/month ({savings['savings_percent']:.1f}%)")

ℹ️Tiered Storage Benefit

Tiered storage can reduce costs by 60-80% for long retention periods. The trade-off is slower access to older data. Use for:

  1. Compliance data requiring long retention
  2. Historical analysis workloads
  3. Disaster recovery backups

Audit Logging

Audit Event Schema

{
  "event_id": "uuid-1234",
  "timestamp": "2024-01-15T10:30:00Z",
  "event_type": "topic.produce",
  "actor": {
    "type": "user",
    "id": "alice@example.com",
    "ip": "192.168.1.100",
    "user_agent": "kafka-producer/3.6.1"
  },
  "resource": {
    "type": "topic",
    "name": "user-events",
    "cluster": "production"
  },
  "action": {
    "type": "write",
    "records_count": 100,
    "bytes_written": 50000
  },
  "result": {
    "status": "success",
    "partition_offsets": {
      "0": 12345,
      "1": 67890
    }
  },
  "metadata": {
    "request_id": "req-5678",
    "duration_ms": 45,
    "schema_id": 42
  }
}

Audit Logger Implementation

class KafkaAuditLogger:
    """
    Audit logger for Kafka operations.
    """
    
    def __init__(self, audit_topic, producer):
        self.audit_topic = audit_topic
        self.producer = producer
    
    def log_produce(self, user, topic, records_count, bytes_written, result):
        """Log produce operation"""
        audit_event = {
            'event_id': str(uuid.uuid4()),
            'timestamp': datetime.utcnow().isoformat(),
            'event_type': 'topic.produce',
            'actor': {
                'type': 'user',
                'id': user,
                'ip': self._get_client_ip(),
                'user_agent': self._get_user_agent()
            },
            'resource': {
                'type': 'topic',
                'name': topic,
                'cluster': self._get_cluster_name()
            },
            'action': {
                'type': 'write',
                'records_count': records_count,
                'bytes_written': bytes_written
            },
            'result': {
                'status': 'success' if result else 'failure',
                'details': result
            },
            'metadata': {
                'timestamp_ms': int(time.time() * 1000)
            }
        }
        
        self.producer.send(self.audit_topic, value=audit_event)
    
    def log_consume(self, user, topic, group, records_count, result):
        """Log consume operation"""
        audit_event = {
            'event_id': str(uuid.uuid4()),
            'timestamp': datetime.utcnow().isoformat(),
            'event_type': 'topic.consume',
            'actor': {
                'type': 'user',
                'id': user,
                'ip': self._get_client_ip()
            },
            'resource': {
                'type': 'topic',
                'name': topic,
                'cluster': self._get_cluster_name()
            },
            'action': {
                'type': 'read',
                'group': group,
                'records_count': records_count
            },
            'result': {
                'status': 'success' if result else 'failure'
            }
        }
        
        self.producer.send(self.audit_topic, value=audit_event)
    
    def log_admin(self, user, operation, resource, result):
        """Log administrative operation"""
        audit_event = {
            'event_id': str(uuid.uuid4()),
            'timestamp': datetime.utcnow().isoformat(),
            'event_type': f'admin.{operation}',
            'actor': {
                'type': 'user',
                'id': user,
                'ip': self._get_client_ip()
            },
            'resource': resource,
            'action': {
                'type': operation
            },
            'result': {
                'status': 'success' if result else 'failure',
                'details': result
            }
        }
        
        self.producer.send(self.audit_topic, value=audit_event)

Audit Retention

audit_retention_config = {
    'topic': 'audit-events',
    'retention_days': 365,  # 1 year for compliance
    'cleanup_policy': 'delete',
    
    # Tiered storage for cost optimization
    'tiered_storage': True,
    'hot_tier_days': 30,
    
    # Compression for audit logs
    'compression_type': 'zstd',
    
    # Replication for durability
    'replication_factor': 3,
    'min_insync_replicas': 2,
    
    # Encryption
    'encryption_at_rest': True,
    'encryption_in_transit': True
}

⚠️Audit Requirements

Many compliance frameworks require audit logging:

  • PCI DSS: Log all access to cardholder data
  • HIPAA: Log all access to PHI
  • SOC 2: Log all administrative actions
  • GDPR: Log data processing activities

Compliance Strategies

Data Classification

data_classification = {
    'Public': {
        'description': 'Data publicly available',
        'retention': 'Business need',
        'encryption': 'Optional',
        'audit': 'Minimal'
    },
    'Internal': {
        'description': 'Internal business data',
        'retention': '7 years',
        'encryption': 'In transit',
        'audit': 'Administrative actions'
    },
    'Confidential': {
        'description': 'Sensitive business data',
        'retention': '7 years',
        'encryption': 'In transit and at rest',
        'audit': 'All access'
    },
    'Restricted': {
        'description': 'PII, PHI, PCI data',
        'retention': 'As required by regulation',
        'encryption': 'In transit and at rest (strong)',
        'audit': 'All access with details'
    }
}

# Classify topics
topic_classification = {
    'user-events': 'Confidential',
    'payment-events': 'Restricted',
    'public-announcements': 'Public',
    'internal-metrics': 'Internal'
}

Compliance Checklist

compliance_checklist = {
    'GDPR': [
        'Right to erasure (tombstones)',
        'Data minimization (retention policies)',
        'Purpose limitation (topic access controls)',
        'Data portability (export capabilities)'
    ],
    'PCI_DSS': [
        'Encrypt cardholder data at rest',
        'Encrypt cardholder data in transit',
        'Restrict access to cardholder data',
        'Log all access to cardholder data',
        'Regular security testing'
    ],
    'HIPAA': [
        'Encrypt PHI at rest',
        'Encrypt PHI in transit',
        'Audit all access to PHI',
        'Business Associate Agreements',
        'Breach notification procedures'
    ],
    'SOC_2': [
        'Access controls',
        'Audit logging',
        'Change management',
        'Incident response',
        'Data retention policies'
    ]
}

def assess_compliance(compliance_framework, kafka_config):
    """
    Assess Kafka configuration against compliance requirements.
    """
    requirements = compliance_checklist.get(compliance_framework, [])
    assessment = []
    
    for requirement in requirements:
        status = check_requirement(requirement, kafka_config)
        assessment.append({
            'requirement': requirement,
            'status': status,
            'recommendation': get_recommendation(requirement, status)
        })
    
    return assessment

def check_requirement(requirement, config):
    """Check if requirement is met"""
    # Simplified check logic
    if 'encrypt' in requirement.lower():
        return 'met' if config.get('encryption') else 'not_met'
    elif 'audit' in requirement.lower():
        return 'met' if config.get('audit_logging') else 'not_met'
    elif 'retention' in requirement.lower():
        return 'met' if config.get('retention_policy') else 'not_met'
    return 'needs_review'

Data Lineage

class DataLineageTracker:
    """
    Track data lineage across Kafka topics.
    """
    
    def __init__(self):
        self.lineage = {}
    
    def record_transformation(self, source_topic, target_topic, transformation):
        """
        Record data transformation.
        
        Tracks how data flows through the system.
        """
        if target_topic not in self.lineage:
            self.lineage[target_topic] = {
                'sources': [],
                'transformations': [],
                'consumers': []
            }
        
        self.lineage[target_topic]['sources'].append(source_topic)
        self.lineage[target_topic]['transformations'].append({
            'type': transformation,
            'timestamp': int(time.time() * 1000)
        })
    
    def get_lineage(self, topic):
        """Get lineage for a topic"""
        return self.lineage.get(topic, {})
    
    def visualize_lineage(self):
        """Generate lineage visualization"""
        for topic, info in self.lineage.items():
            print(f"Topic: {topic}")
            print(f"  Sources: {info['sources']}")
            print(f"  Transformations: {len(info['transformations'])}")
            print()

ℹ️Data Lineage

Data lineage helps you understand:

  1. Where data comes from
  2. How it's transformed
  3. Where it goes
  4. Who uses it

This is critical for debugging, compliance, and data governance.

Governance Metrics

governance_metrics = {
    'Data Quality': {
        'schema_compliance_rate': 'Percentage of messages matching schema',
        'data_freshness': 'Age of newest message',
        'completeness': 'Percentage of required fields present'
    },
    'Retention': {
        'topics_exceeding_retention': 'Topics with data beyond retention',
        'storage_utilization': 'Storage usage vs capacity',
        'deletion_rate': 'Data deleted per day'
    },
    'Access Control': {
        'unauthorized_access_attempts': 'Failed authorization checks',
        'privileged_user_count': 'Users with admin access',
        'acl_coverage': 'Percentage of topics with ACLs'
    },
    'Compliance': {
        'audit_log_completeness': 'Percentage of operations logged',
        'encryption_coverage': 'Percentage of topics encrypted',
        'retention_compliance': 'Topics meeting retention requirements'
    }
}

def calculate_governance_score(metrics):
    """
    Calculate overall governance score.
    """
    weights = {
        'schema_compliance_rate': 0.3,
        'audit_log_completeness': 0.25,
        'encryption_coverage': 0.25,
        'acl_coverage': 0.2
    }
    
    score = 0
    for metric, weight in weights.items():
        value = metrics.get(metric, 0)
        score += value * weight
    
    return score * 100

⚠️Key Insight

Data governance is not just about technology. It requires:

  1. Clear policies and procedures
  2. Training and awareness
  3. Regular audits and reviews
  4. Continuous improvement
  5. Executive sponsorship

Advertisement