Security Best Practices in Python
Difficulty: Medium-Hard | Companies: Google, Meta, Amazon, Netflix, Stripe
Authentication and Authorization
import hashlib
import hmac
import secrets
import jwt
from datetime import datetime, timedelta
from typing import Optional, Dict
from functools import wraps
from passlib.context import CryptContext
import bcrypt
# Password Hashing
class PasswordManager:
"""Secure password hashing and verification."""
def __init__(self):
self.pwd_context = CryptContext(
schemes=["bcrypt"],
deprecated="auto",
bcrypt__rounds=12
)
def hash_password(self, password: str) -> str:
"""Hash password with bcrypt."""
return self.pwd_context.hash(password)
def verify_password(self, plain_password: str, hashed_password: str) -> bool:
"""Verify password against hash."""
return self.pwd_context.verify(plain_password, hashed_password)
def needs_update(self, hashed_password: str) -> bool:
"""Check if password hash needs updating."""
return self.pwd_context.needs_update(hashed_password)
# JWT Token Management
class JWTManager:
"""JWT token creation and verification."""
def __init__(self, secret_key: str, algorithm: str = "HS256"):
self.secret_key = secret_key
self.algorithm = algorithm
self.access_token_expire = timedelta(minutes=15)
self.refresh_token_expire = timedelta(days=7)
def create_access_token(self, data: Dict, expires_delta: Optional[timedelta] = None) -> str:
"""Create access token."""
to_encode = data.copy()
expire = datetime.utcnow() + (expires_delta or self.access_token_expire)
to_encode.update({"exp": expire, "type": "access"})
return jwt.encode(to_encode, self.secret_key, algorithm=self.algorithm)
def create_refresh_token(self, data: Dict) -> str:
"""Create refresh token."""
to_encode = data.copy()
expire = datetime.utcnow() + self.refresh_token_expire
to_encode.update({"exp": expire, "type": "refresh"})
return jwt.encode(to_encode, self.secret_key, algorithm=self.algorithm)
def verify_token(self, token: str) -> Optional[Dict]:
"""Verify and decode token."""
try:
payload = jwt.decode(token, self.secret_key, algorithms=[self.algorithm])
return payload
except jwt.ExpiredSignatureError:
raise ValueError("Token expired")
except jwt.InvalidTokenError:
raise ValueError("Invalid token")
# API Key Authentication
class APIKeyManager:
"""API key generation and validation."""
def __init__(self):
self.keys = {}
def generate_key(self, user_id: str, scopes: list = None) -> str:
"""Generate new API key."""
api_key = f"sk_{secrets.token_hex(32)}"
self.keys[api_key] = {
"user_id": user_id,
"scopes": scopes or ["read"],
"created_at": datetime.now(),
"last_used": None,
"is_active": True
}
return api_key
def validate_key(self, api_key: str) -> Optional[Dict]:
"""Validate API key."""
if api_key not in self.keys:
return None
key_info = self.keys[api_key]
if not key_info["is_active"]:
return None
key_info["last_used"] = datetime.now()
return key_info
def revoke_key(self, api_key: str) -> bool:
"""Revoke API key."""
if api_key in self.keys:
self.keys[api_key]["is_active"] = False
return True
return False
# Role-Based Access Control
class RBACManager:
"""Role-based access control."""
def __init__(self):
self.roles = {
"admin": ["read", "write", "delete", "manage_users"],
"editor": ["read", "write"],
"viewer": ["read"]
}
self.user_roles = {}
def assign_role(self, user_id: str, role: str):
"""Assign role to user."""
if role not in self.roles:
raise ValueError(f"Invalid role: {role}")
self.user_roles[user_id] = role
def check_permission(self, user_id: str, permission: str) -> bool:
"""Check if user has permission."""
if user_id not in self.user_roles:
return False
role = self.user_roles[user_id]
return permission in self.roles.get(role, [])
def require_permission(permission: str):
"""Decorator to require specific permission."""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# Get current user from context
current_user = kwargs.get('current_user') or args[0] if args else None
if not current_user:
raise ValueError("No authenticated user")
rbac = RBACManager()
if not rbac.check_permission(current_user['id'], permission):
raise PermissionError(f"Missing permission: {permission}")
return func(*args, **kwargs)
return wrapper
return decorator
βΉοΈ
Always use bcrypt or Argon2 for password hashing. Never store passwords in plain text or use weak hashing algorithms like MD5.
Input Validation and Sanitization
import re
from typing import Any, Optional
from pydantic import BaseModel, Field, validator, EmailStr
from html import escape
import bleach
class SecureInputValidator:
"""Input validation and sanitization."""
@staticmethod
def validate_email(email: str) -> bool:
"""Validate email format."""
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
return bool(re.match(pattern, email))
@staticmethod
def sanitize_string(input_str: str, max_length: int = 1000) -> str:
"""Sanitize string input."""
# Remove control characters
sanitized = re.sub(r'[\x00-\x1f\x7f-\x9f]', '', input_str)
# Trim and limit length
sanitized = sanitized.strip()[:max_length]
return sanitized
@staticmethod
def sanitize_html(html_content: str) -> str:
"""Sanitize HTML content."""
# Allowed tags and attributes
allowed_tags = ['p', 'b', 'i', 'u', 'em', 'strong', 'a', 'ul', 'ol', 'li']
allowed_attrs = {'a': ['href', 'title']}
return bleach.clean(html_content, tags=allowed_tags, attributes=allowed_attrs)
@staticmethod
def validate_sql_input(input_str: str) -> bool:
"""Basic SQL injection detection."""
sql_patterns = [
r"(\b(SELECT|INSERT|UPDATE|DELETE|DROP|UNION|ALTER)\b)",
r"(--|;|'|\"|\bOR\b\s+\b1\b\s*=\s*\b1\b)",
r"(\bSLEEP\b\s*\()",
r"(\bBENCHMARK\b\s*\()",
]
for pattern in sql_patterns:
if re.search(pattern, input_str, re.IGNORECASE):
return False
return True
@staticmethod
def validate_file_upload(filename: str, allowed_extensions: list) -> bool:
"""Validate file upload."""
if not filename:
return False
# Check extension
ext = filename.rsplit('.', 1)[-1].lower() if '.' in filename else ''
return ext in allowed_extensions
# Pydantic Models with Validation
class SecureUserInput(BaseModel):
"""User input with security validation."""
username: str = Field(..., min_length=3, max_length=50)
email: EmailStr
bio: Optional[str] = Field(None, max_length=500)
@validator('username')
def validate_username(cls, v):
"""Validate username format."""
if not re.match(r'^[a-zA-Z0-9_]+$', v):
raise ValueError('Username can only contain letters, numbers, and underscores')
return v
@validator('bio')
def validate_bio(cls, v):
"""Sanitize bio content."""
if v:
# Remove HTML tags
v = re.sub(r'<[^>]+>', '', v)
# Limit length
v = v[:500]
return v
class SecureFileUpload(BaseModel):
"""File upload with security validation."""
filename: str
content_type: str
size: int
@validator('filename')
def validate_filename(cls, v):
"""Validate filename."""
# Check for path traversal
if '..' in v or '/' in v or '\\' in v:
raise ValueError('Invalid filename')
# Check extension
allowed = ['jpg', 'jpeg', 'png', 'gif', 'pdf', 'txt']
ext = v.rsplit('.', 1)[-1].lower() if '.' in v else ''
if ext not in allowed:
raise ValueError(f'File type not allowed: {ext}')
return v
@validator('size')
def validate_size(cls, v):
"""Validate file size."""
max_size = 10 * 1024 * 1024 # 10MB
if v > max_size:
raise ValueError(f'File too large: {v} bytes (max: {max_size})')
return v
Encryption and Cryptography
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.hazmat.primitives import serialization
import base64
import os
class EncryptionManager:
"""Data encryption and decryption."""
def __init__(self, key: bytes = None):
if key is None:
key = Fernet.generate_key()
self.cipher = Fernet(key)
def encrypt(self, data: str) -> str:
"""Encrypt string data."""
return self.cipher.encrypt(data.encode()).decode()
def decrypt(self, encrypted_data: str) -> str:
"""Decrypt string data."""
return self.cipher.decrypt(encrypted_data.encode()).decode()
@staticmethod
def derive_key_from_password(password: str, salt: bytes = None) -> tuple:
"""Derive encryption key from password."""
if salt is None:
salt = os.urandom(16)
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=salt,
iterations=100000,
)
key = base64.urlsafe_b64encode(kdf.derive(password.encode()))
return key, salt
class RSAEncryption:
"""RSA encryption for asymmetric operations."""
def __init__(self):
self.private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
)
self.public_key = self.private_key.public_key()
def encrypt(self, data: bytes) -> bytes:
"""Encrypt data with public key."""
return self.public_key.encrypt(
data,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None
)
)
def decrypt(self, encrypted_data: bytes) -> bytes:
"""Decrypt data with private key."""
return self.private_key.decrypt(
encrypted_data,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None
)
)
def export_public_key(self) -> str:
"""Export public key."""
return self.public_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
).decode()
# Digital Signatures
class DigitalSignature:
"""Digital signature creation and verification."""
def __init__(self):
self.private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
)
self.public_key = self.private_key.public_key()
def sign(self, data: bytes) -> bytes:
"""Create digital signature."""
return self.private_key.sign(
data,
padding.PSS(
mgf=padding.MGF1(hashes.SHA256()),
salt_length=padding.PSS.MAX_LENGTH
),
hashes.SHA256()
)
def verify(self, data: bytes, signature: bytes) -> bool:
"""Verify digital signature."""
try:
self.public_key.verify(
signature,
data,
padding.PSS(
mgf=padding.MGF1(hashes.SHA256()),
salt_length=padding.PSS.MAX_LENGTH
),
hashes.SHA256()
)
return True
except Exception:
return False
β οΈ
Never implement your own cryptography. Use well-established libraries like
cryptography or pyca.Secure Configuration Management
import os
from typing import Optional
from dataclasses import dataclass
from pathlib import Path
import json
@dataclass
class SecurityConfig:
"""Security configuration."""
# JWT Settings
jwt_secret_key: str
jwt_algorithm: str = "HS256"
jwt_access_token_expire_minutes: int = 15
jwt_refresh_token_expire_days: int = 7
# Password Settings
password_min_length: int = 8
password_require_uppercase: bool = True
password_require_numbers: bool = True
password_require_special: bool = True
# Rate Limiting
rate_limit_requests: int = 100
rate_limit_window_seconds: int = 60
# CORS Settings
cors_origins: list = None
cors_methods: list = None
# File Upload Settings
max_upload_size_mb: int = 10
allowed_file_types: list = None
@classmethod
def from_env(cls) -> 'SecurityConfig':
"""Load configuration from environment variables."""
return cls(
jwt_secret_key=os.getenv('JWT_SECRET_KEY', secrets.token_hex(32)),
cors_origins=os.getenv('CORS_ORIGINS', '*').split(','),
allowed_file_types=os.getenv('ALLOWED_FILE_TYPES', 'jpg,png,pdf').split(',')
)
class SecretsManager:
"""Manage application secrets."""
def __init__(self):
self.secrets = {}
def set_secret(self, name: str, value: str):
"""Set a secret."""
# In production, use a secure vault like HashiCorp Vault
self.secrets[name] = value
def get_secret(self, name: str) -> Optional[str]:
"""Get a secret."""
return self.secrets.get(name)
def load_from_file(self, filepath: str):
"""Load secrets from file."""
with open(filepath, 'r') as f:
self.secrets = json.load(f)
def save_to_file(self, filepath: str):
"""Save secrets to file."""
with open(filepath, 'w') as f:
json.dump(self.secrets, f, indent=2)
# Security Headers Middleware
class SecurityHeadersMiddleware:
"""Add security headers to responses."""
def __init__(self, app):
self.app = app
def __call__(self, environ, start_response):
def custom_start_response(status, headers, exc_info=None):
# Add security headers
headers.extend([
('X-Content-Type-Options', 'nosniff'),
('X-Frame-Options', 'DENY'),
('X-XSS-Protection', '1; mode=block'),
('Strict-Transport-Security', 'max-age=31536000; includeSubDomains'),
('Content-Security-Policy', "default-src 'self'"),
('Referrer-Policy', 'strict-origin-when-cross-origin'),
])
return start_response(status, headers, exc_info)
return self.app(environ, custom_start_response)
Security Testing
import pytest
from unittest.mock import Mock
class SecurityTestSuite:
"""Security testing utilities."""
@staticmethod
def test_sql_injection():
"""Test SQL injection prevention."""
validator = SecureInputValidator()
# Malicious inputs
malicious_inputs = [
"'; DROP TABLE users; --",
"1' OR '1'='1",
"admin'--",
"1; SELECT * FROM users",
]
for malicious_input in malicious_inputs:
assert not validator.validate_sql_input(malicious_input)
@staticmethod
def test_xss_prevention():
"""Test XSS prevention."""
validator = SecureInputValidator()
xss_payloads = [
"<script>alert('xss')</script>",
"<img src=x onerror=alert('xss')>",
"javascript:alert('xss')",
]
for payload in xss_payloads:
sanitized = validator.sanitize_html(payload)
assert "<script>" not in sanitized
assert "javascript:" not in sanitized
@staticmethod
def test_path_traversal():
"""Test path traversal prevention."""
validator = SecureInputValidator()
malicious_filenames = [
"../../../etc/passwd",
"..\\..\\windows\\system32",
"test/../../../secret",
]
for filename in malicious_filenames:
assert not validator.validate_file_upload(filename, ['txt', 'pdf'])
# Security headers check
def test_security_headers(client):
"""Test that security headers are present."""
response = client.get('/')
assert response.headers.get('X-Content-Type-Options') == 'nosniff'
assert response.headers.get('X-Frame-Options') == 'DENY'
assert 'Strict-Transport-Security' in response.headers
βΉοΈ
Security is a continuous process. Regularly update dependencies, scan for vulnerabilities, and conduct security audits.
Follow-Up Questions
-
Explain the difference between authentication and authorization.
-
How do you protect against common web vulnerabilities (OWASP Top 10)?
-
What are the best practices for storing secrets in production?
-
How do you implement rate limiting to prevent abuse?
-
Explain the principle of least privilege.