Data Security & Encryption: Compliance and Protection
Difficulty: Senior Level | Companies: Stripe, PayPal, Amazon, Google, Microsoft
1. Data Security Layers
Architecture Diagram
Security Stack:
βββ Network Security
β βββ VPC, Security Groups, NACLs
β βββ Private Endpoints
β βββ TLS/SSL
βββ Access Control
β βββ IAM Roles & Policies
β βββ Column-Level Security
β βββ Row-Level Security
βββ Encryption
β βββ At Rest (AES-256)
β βββ In Transit (TLS 1.3)
β βββ Column-Level Encryption
βββ Data Masking
β βββ Dynamic Data Masking
β βββ Static Data Masking
β βββ Tokenization
βββ Audit & Compliance
βββ Access Logs
βββ Data Lineage
βββ GDPR/CCPA Tools
2. Encryption Strategies
At-Rest Encryption
import hashlib
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
import base64
class EncryptionManager:
def __init__(self, master_key: bytes):
self.master_key = master_key
def generate_data_key(self) -> bytes:
"""Generate a unique data encryption key"""
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=self.master_key[:16],
iterations=480000,
)
key = base64.urlsafe_b64encode(kdf.derive(self.master_key))
return key
def encrypt_column(self, plaintext: str, data_key: bytes) -> dict:
"""Encrypt a single column value"""
f = Fernet(data_key)
encrypted = f.encrypt(plaintext.encode())
return {
"ciphertext": encrypted.decode(),
"key_id": hashlib.sha256(data_key).hexdigest()[:16],
"algorithm": "AES-256-GCM",
}
def decrypt_column(self, encrypted_data: dict, data_key: bytes) -> str:
"""Decrypt a single column value"""
f = Fernet(data_key)
return f.decrypt(encrypted_data["ciphertext"].encode()).decode()
Column-Level Encryption in Spark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StringType
spark = SparkSession.builder.appName("ColumnEncryption").getOrCreate()
# Register UDF for encryption
@udf(returnType=StringType())
def encrypt_pii(value, key):
if value is None:
return None
encryptor = EncryptionManager(key.encode())
data_key = encryptor.generate_data_key()
encrypted = encryptor.encrypt_column(value, data_key)
return json.dumps(encrypted)
@udf(returnType=StringType())
def decrypt_pii(encrypted_json, key):
if encrypted_json is None:
return None
encryptor = EncryptionManager(key.encode())
data = json.loads(encrypted_json)
return encryptor.decrypt_column(data, key.encode())
# Apply column-level encryption
encrypted_df = df.withColumn(
"email_encrypted",
encrypt_pii(F.col("email"), F.lit(MASTER_KEY))
)
# Write encrypted data
encrypted_df.write.format("parquet").save("s3://secure-data/users/")
3. Access Control Patterns
Row-Level Security
class RowLevelSecurity:
def __init__(self):
self.policies = {}
def add_policy(self, table: str, role: str, filter_condition: str):
"""Add row-level security policy"""
if table not in self.policies:
self.policies[table] = []
self.policies[table].append({
"role": role,
"filter": filter_condition
})
def apply_policy(self, table_name: str, user_role: str, query: str) -> str:
"""Inject filter into query based on user role"""
policies = self.policies.get(table_name, [])
filters = []
for policy in policies:
if policy["role"] == user_role or policy["role"] == "*":
filters.append(policy["filter"])
if filters:
where_clause = " AND ".join(filters)
if "WHERE" in query.upper():
query = f"{query} AND ({where_clause})"
else:
query = f"{query} WHERE {where_clause}"
return query
# Usage
rls = RowLevelSecurity()
rls.add_policy("sales", "region_manager", "region = CURRENT_REGION()")
rls.add_policy("sales", "analyst", "1=1") # analysts see all
rls.add_policy("hr_data", "employee", "employee_id = CURRENT_USER_ID()")
secured_query = rls.apply_policy("sales", "region_manager",
"SELECT * FROM sales WHERE date >= '2024-01-01'")
# Result: SELECT * FROM sales WHERE date >= '2024-01-01' AND (region = CURRENT_REGION())
Column-Level Security
class ColumnLevelSecurity:
def __init__(self):
self.classifications = {} # column -> classification
self.role_permissions = {} # role -> allowed classifications
def classify_column(self, table: str, column: str, classification: str):
key = f"{table}.{column}"
self.classifications[key] = classification
def grant_access(self, role: str, classifications: list):
self.role_permissions[role] = set(classifications)
def filter_columns(self, table: str, role: str, columns: list) -> list:
allowed_classifications = self.role_permissions.get(role, set())
filtered = []
for col in columns:
key = f"{table}.{col}"
classification = self.classifications.get(key, "public")
if classification in allowed_classifications or role == "admin":
filtered.append(col)
else:
filtered.append(f"NULL AS {col}") # Replace with NULL
return filtered
# Usage
cls = ColumnLevelSecurity()
cls.classify_column("users", "email", "pii")
cls.classify_column("users", "ssn", "highly_sensitive")
cls.classify_column("users", "name", "internal")
cls.grant_access("analyst", ["internal", "public"])
cls.grant_access("hr_team", ["pii", "highly_sensitive", "internal", "public"])
filtered = cls.filter_columns("users", "analyst", ["name", "email", "ssn"])
# Result: ["name", "NULL AS email", "NULL AS ssn"]
4. GDPR & CCPA Compliance
Right to Erasure (Right to be Forgotten)
class GDPRHandler:
def __init__(self, spark, data_catalog):
self.spark = spark
self.catalog = data_catalog
def handle_deletion_request(self, user_id: str) -> dict:
"""Process a GDPR deletion request"""
affected_datasets = self.catalog.find_datasets_with_entity(user_id)
results = {"deleted": [], "anonymized": [], "errors": []}
for dataset in affected_datasets:
try:
if dataset.deletion_possible:
# Hard delete from table
self.spark.sql(f"""
DELETE FROM {dataset.fully_qualified}
WHERE user_id = '{user_id}'
""")
results["deleted"].append(dataset.name)
else:
# Anonymize instead of delete (for referential integrity)
self.spark.sql(f"""
UPDATE {dataset.fully_qualified}
SET user_id = 'ANONYMOUS',
email = NULL,
name = 'DELETED_USER'
WHERE user_id = '{user_id}'
""")
results["anonymized"].append(dataset.name)
except Exception as e:
results["errors"].append({"dataset": dataset.name, "error": str(e)})
return results
def export_user_data(self, user_id: str) -> dict:
"""Export all data for a user (Right to Portability)"""
all_data = {}
for dataset in self.catalog.find_datasets_with_entity(user_id):
data = self.spark.sql(f"""
SELECT * FROM {dataset.fully_qualified}
WHERE user_id = '{user_id}'
""").toPandas().to_dict(orient="records")
all_data[dataset.name] = data
return all_data
5. Key Management
class KeyManager:
"""AWS KMS key management"""
def __init__(self, kms_client):
self.kms = kms_client
def create_data_key(self, key_id: str) -> dict:
"""Generate data encryption key"""
response = self.kms.generate_data_key(
KeyId=key_id,
KeySpec='AES_256'
)
return {
"plaintext_key": response['Plaintext'],
"encrypted_key": response['CiphertextBlob'],
}
def encrypt_key(self, key_id: str, data_key: bytes) -> bytes:
"""Encrypt a data key"""
response = self.kms.encrypt(
KeyId=key_id,
Plaintext=data_key
)
return response['CiphertextBlob']
def decrypt_key(self, encrypted_key: bytes) -> bytes:
"""Decrypt a data key"""
response = self.kms.decrypt(
CiphertextBlob=encrypted_key
)
return response['Plaintext']
def rotate_key(self, key_id: str):
"""Rotate encryption key"""
self.kms.enable_key_rotation(KeyId=key_id)
βΉοΈ
Best Practice: Use envelope encryption β encrypt data with a data key, then encrypt the data key with a master key. This limits the exposure if a single key is compromised.
Follow-Up Questions
- How would you implement end-to-end encryption for a data pipeline?
- Design a GDPR-compliant data deletion system for a data lake.
- How do you handle encryption key rotation without downtime?
- Design a data masking solution for development environments.
- How would you audit data access for compliance purposes?