Production Python Patterns
Difficulty: Hard | Companies: Google, Meta, Amazon, Netflix, Stripe
Logging Best Practices
import logging
import json
import sys
from datetime import datetime
from typing import Any, Dict
from contextvars import ContextVar
import uuid
# Structured Logging
class StructuredFormatter(logging.Formatter):
"""JSON structured log formatter."""
def format(self, record: logging.LogRecord) -> str:
log_data = {
"timestamp": datetime.utcnow().isoformat(),
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
"module": record.module,
"function": record.funcName,
"line": record.lineno,
}
if record.exc_info:
log_data["exception"] = self.formatException(record.exc_info)
# Add custom fields
if hasattr(record, 'extra_data'):
log_data["data"] = record.extra_data
return json.dumps(log_data)
# Context Variables for Request Tracking
request_id_var: ContextVar[str] = ContextVar('request_id', default='')
class RequestContextFilter(logging.Filter):
"""Add request context to logs."""
def filter(self, record):
record.request_id = request_id_var.get('')
return True
# Logger Setup
def setup_logging(log_level: str = "INFO", json_output: bool = True):
"""Configure production logging."""
logger = logging.getLogger()
logger.setLevel(getattr(logging, log_level.upper()))
# Clear existing handlers
logger.handlers.clear()
# Console handler
handler = logging.StreamHandler(sys.stdout)
if json_output:
handler.setFormatter(StructuredFormatter())
else:
handler.setFormatter(logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
))
# Add context filter
handler.addFilter(RequestContextFilter())
logger.addHandler(handler)
return logger
# Application Logger
logger = setup_logging(json_output=True)
class ApplicationLogger:
"""Application-specific logging utilities."""
@staticmethod
def log_request(method: str, path: str, status_code: int, duration: float):
"""Log HTTP request."""
logger.info(
"HTTP Request",
extra={
"extra_data": {
"method": method,
"path": path,
"status_code": status_code,
"duration_ms": duration * 1000,
"request_id": request_id_var.get('')
}
}
)
@staticmethod
def log_database_query(query: str, duration: float, rows_affected: int):
"""Log database query."""
logger.debug(
"Database Query",
extra={
"extra_data": {
"query": query[:200], # Truncate long queries
"duration_ms": duration * 1000,
"rows_affected": rows_affected
}
}
)
@staticmethod
def log_error(error: Exception, context: Dict[str, Any] = None):
"""Log error with context."""
logger.error(
f"Error: {str(error)}",
exc_info=True,
extra={
"extra_data": {
"error_type": type(error).__name__,
"context": context or {}
}
}
)
# Usage
def handle_request():
"""Example request handling with logging."""
request_id_var.set(str(uuid.uuid4()))
try:
logger.info("Processing request")
# Process request
ApplicationLogger.log_request("GET", "/api/users", 200, 0.05)
except Exception as e:
ApplicationLogger.log_error(e, {"user_id": 123})
raise
βΉοΈ
Use structured logging (JSON format) in production for better log aggregation and analysis with tools like ELK Stack or Datadog.
Monitoring and Metrics
from prometheus_client import Counter, Histogram, Gauge, start_http_server
from typing import Callable
import time
from functools import wraps
from contextlib import contextmanager
# Prometheus Metrics
REQUEST_COUNT = Counter(
'http_requests_total',
'Total HTTP requests',
['method', 'endpoint', 'status_code']
)
REQUEST_LATENCY = Histogram(
'http_request_duration_seconds',
'HTTP request latency',
['method', 'endpoint'],
buckets=[.005, .01, .025, .05, .1, .25, .5, 1.0, 2.5, 5.0]
)
ACTIVE_REQUESTS = Gauge(
'http_requests_active',
'Number of active HTTP requests'
)
DB_QUERY_COUNT = Counter(
'database_queries_total',
'Total database queries',
['operation', 'table']
)
DB_QUERY_LATENCY = Histogram(
'database_query_duration_seconds',
'Database query latency',
['operation', 'table']
)
# Metrics Middleware
class MetricsMiddleware:
"""Collect metrics for all requests."""
def __init__(self, app):
self.app = app
def __call__(self, environ, start_response):
start_time = time.time()
ACTIVE_REQUESTS.inc()
def custom_start_response(status, headers, exc_info=None):
# Extract status code
status_code = int(status.split(' ')[0])
# Record metrics
REQUEST_COUNT.labels(
method=environ.get('REQUEST_METHOD', 'UNKNOWN'),
endpoint=environ.get('PATH_INFO', '/'),
status_code=status_code
).inc()
REQUEST_LATENCY.labels(
method=environ.get('REQUEST_METHOD', 'UNKNOWN'),
endpoint=environ.get('PATH_INFO', '/')
).observe(time.time() - start_time)
ACTIVE_REQUESTS.dec()
return start_response(status, headers, exc_info)
return self.app(environ, custom_start_response)
# Performance Monitoring
class PerformanceMonitor:
"""Application performance monitoring."""
def __init__(self):
self.metrics = {}
@contextmanager
def track_operation(self, operation_name: str):
"""Track operation duration."""
start_time = time.time()
try:
yield
finally:
duration = time.time() - start_time
self.record_metric(operation_name, duration)
def record_metric(self, name: str, value: float):
"""Record a metric value."""
if name not in self.metrics:
self.metrics[name] = []
self.metrics[name].append({
'value': value,
'timestamp': datetime.now()
})
# Keep only last 1000 metrics
if len(self.metrics[name]) > 1000:
self.metrics[name] = self.metrics[name][-1000:]
def get_stats(self, name: str) -> dict:
"""Get statistics for a metric."""
if name not in self.metrics:
return {}
values = [m['value'] for m in self.metrics[name]]
return {
'count': len(values),
'sum': sum(values),
'avg': sum(values) / len(values),
'min': min(values),
'max': max(values)
}
# Health Check Endpoints
class HealthChecker:
"""Application health checks."""
def __init__(self):
self.checks = {}
def register_check(self, name: str, check_func: Callable):
"""Register a health check."""
self.checks[name] = check_func
def run_checks(self) -> dict:
"""Run all health checks."""
results = {}
all_healthy = True
for name, check_func in self.checks.items():
try:
is_healthy = check_func()
results[name] = {"status": "healthy" if is_healthy else "unhealthy"}
if not is_healthy:
all_healthy = False
except Exception as e:
results[name] = {"status": "error", "error": str(e)}
all_healthy = False
return {
"status": "healthy" if all_healthy else "unhealthy",
"checks": results
}
# Usage
monitor = PerformanceMonitor()
health_checker = HealthChecker()
def check_database():
"""Check database connectivity."""
return True # Actual check implementation
def check_cache():
"""Check cache connectivity."""
return True # Actual check implementation
health_checker.register_check("database", check_database)
health_checker.register_check("cache", check_cache)
# Start metrics server
# start_http_server(8000)
Configuration Management
from pydantic import BaseSettings, Field
from typing import Optional, List
from functools import lru_cache
import os
from pathlib import Path
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
class Settings(BaseSettings):
"""Application settings with validation."""
# Application
app_name: str = "My App"
app_version: str = "1.0.0"
debug: bool = False
environment: str = Field("development", regex="^(development|staging|production)$")
# Database
database_url: str = "sqlite:///./app.db"
database_pool_size: int = 5
database_max_overflow: int = 10
# Redis
redis_url: str = "redis://localhost:6379/0"
# Security
secret_key: str = Field(..., min_length=32)
access_token_expire_minutes: int = 30
allowed_origins: List[str] = ["http://localhost:3000"]
# Logging
log_level: str = "INFO"
log_format: str = "json"
# Monitoring
enable_metrics: bool = True
metrics_port: int = 9090
class Config:
env_file = ".env"
env_file_encoding = "utf-8"
case_sensitive = False
@lru_cache()
def get_settings() -> Settings:
"""Get cached settings instance."""
return Settings()
# Environment-specific configurations
class DevelopmentSettings(Settings):
"""Development environment settings."""
debug: bool = True
database_url: str = "sqlite:///./dev.db"
log_level: str = "DEBUG"
class ProductionSettings(Settings):
"""Production environment settings."""
debug: bool = False
database_pool_size: int = 20
database_max_overflow: int = 30
log_level: str = "WARNING"
# Configuration factory
def get_environment_settings() -> Settings:
"""Get settings based on environment."""
env = os.getenv("ENVIRONMENT", "development")
if env == "production":
return ProductionSettings()
elif env == "development":
return DevelopmentSettings()
else:
return Settings()
# Usage
settings = get_settings()
print(f"App: {settings.app_name}, Env: {settings.environment}")
β οΈ
Never commit sensitive configuration to version control. Use environment variables or secret management systems.
Containerization with Docker
# Dockerfile example (would be in Dockerfile, not .py)
"""
FROM python:3.11-slim
WORKDIR /app
# Install system dependencies
RUN apt-get update && apt-get install -y \
gcc \
&& rm -rf /var/lib/apt/lists/*
# Install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY . .
# Create non-root user
RUN useradd --create-home --shell /bin/bash appuser
USER appuser
# Expose port
EXPOSE 8000
# Health check
HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \
CMD curl -f http://localhost:8000/health || exit 1
# Run application
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
"""
# Docker Compose configuration
"""
version: '3.8'
services:
web:
build: .
ports:
- "8000:8000"
environment:
- ENVIRONMENT=production
- DATABASE_URL=postgresql://user:pass@db:5432/app
- REDIS_URL=redis://redis:6379/0
depends_on:
- db
- redis
volumes:
- ./logs:/app/logs
restart: unless-stopped
db:
image: postgres:15
environment:
- POSTGRES_USER=user
- POSTGRES_PASSWORD=pass
- POSTGRES_DB=app
volumes:
- postgres_data:/var/lib/postgresql/data
ports:
- "5432:5432"
redis:
image: redis:7-alpine
ports:
- "6379:6379"
nginx:
image: nginx:alpine
ports:
- "80:80"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf
depends_on:
- web
volumes:
postgres_data:
"""
# Python Docker utilities
class DockerManager:
"""Docker container management utilities."""
@staticmethod
def get_container_stats(container_name: str) -> dict:
"""Get container statistics."""
import subprocess
result = subprocess.run(
["docker", "stats", container_name, "--no-stream", "--format", "{{json .}}"],
capture_output=True,
text=True
)
if result.returncode == 0:
return json.loads(result.stdout)
return {}
@staticmethod
def health_check(url: str, timeout: int = 5) -> bool:
"""Check container health."""
import requests
try:
response = requests.get(url, timeout=timeout)
return response.status_code == 200
except requests.RequestException:
return False
Deployment Patterns
from enum import Enum
from typing import Optional
import subprocess
from dataclasses import dataclass
from datetime import datetime
class DeploymentStrategy(Enum):
"""Deployment strategies."""
BLUE_GREEN = "blue_green"
CANARY = "canary"
ROLLING = "rolling"
@dataclass
class DeploymentConfig:
"""Deployment configuration."""
strategy: DeploymentStrategy
version: str
replicas: int = 3
health_check_url: str = "/health"
rollback_on_failure: bool = True
class DeploymentManager:
"""Manage application deployments."""
def __init__(self, config: DeploymentConfig):
self.config = config
self.deployment_history = []
def deploy(self) -> bool:
"""Execute deployment."""
print(f"Deploying version {self.config.version} using {self.config.strategy.value}")
try:
if self.config.strategy == DeploymentStrategy.BLUE_GREEN:
return self._blue_green_deploy()
elif self.config.strategy == DeploymentStrategy.CANARY:
return self._canary_deploy()
elif self.config.strategy == DeploymentStrategy.ROLLING:
return self._rolling_deploy()
except Exception as e:
print(f"Deployment failed: {e}")
if self.config.rollback_on_failure:
self.rollback()
return False
return True
def _blue_green_deploy(self) -> bool:
"""Blue-green deployment."""
print("Switching traffic to green environment")
# Implementation would switch load balancer
return True
def _canary_deploy(self) -> bool:
"""Canary deployment."""
print("Gradually shifting traffic to new version")
# Implementation would adjust traffic percentages
return True
def _rolling_deploy(self) -> bool:
"""Rolling deployment."""
print("Updating instances one by one")
# Implementation would update pods/instances
return True
def rollback(self) -> bool:
"""Rollback to previous version."""
print("Rolling back deployment")
# Implementation would restore previous version
return True
def record_deployment(self, success: bool):
"""Record deployment in history."""
self.deployment_history.append({
'version': self.config.version,
'strategy': self.config.strategy.value,
'timestamp': datetime.now(),
'success': success
})
# CI/CD Pipeline Integration
class CICDPipeline:
"""CI/CD pipeline configuration."""
def __init__(self):
self.stages = []
def add_stage(self, name: str, command: str):
"""Add pipeline stage."""
self.stages.append({'name': name, 'command': command})
def run(self) -> bool:
"""Execute pipeline."""
for stage in self.stages:
print(f"Running stage: {stage['name']}")
result = subprocess.run(
stage['command'],
shell=True,
capture_output=True,
text=True
)
if result.returncode != 0:
print(f"Stage {stage['name']} failed: {result.stderr}")
return False
print(f"Stage {stage['name']} completed successfully")
return True
# Example pipeline
pipeline = CICDPipeline()
pipeline.add_stage("lint", "flake8 .")
pipeline.add_stage("test", "pytest tests/")
pipeline.add_stage("build", "docker build -t myapp:latest .")
pipeline.add_stage("push", "docker push myapp:latest")
βΉοΈ
Always implement proper health checks, graceful shutdown, and monitoring in production deployments.
Graceful Shutdown
import signal
import sys
from typing import Callable
import asyncio
class GracefulShutdown:
"""Handle graceful application shutdown."""
def __init__(self):
self.shutdown_hooks = []
self.is_shutting_down = False
def register_hook(self, hook: Callable):
"""Register shutdown hook."""
self.shutdown_hooks.append(hook)
def signal_handler(self, signum, frame):
"""Handle shutdown signals."""
print(f"Received signal {signum}, initiating graceful shutdown...")
self.is_shutting_down = True
# Execute shutdown hooks
for hook in self.shutdown_hooks:
try:
hook()
except Exception as e:
print(f"Error in shutdown hook: {e}")
print("Graceful shutdown complete")
sys.exit(0)
def setup(self):
"""Setup signal handlers."""
signal.signal(signal.SIGINT, self.signal_handler)
signal.signal(signal.SIGTERM, self.signal_handler)
# Usage
shutdown_manager = GracefulShutdown()
def cleanup_database():
"""Close database connections."""
print("Closing database connections")
def cleanup_cache():
"""Flush cache."""
print("Flushing cache")
shutdown_manager.register_hook(cleanup_database)
shutdown_manager.register_hook(cleanup_cache)
shutdown_manager.setup()
# Application main loop
async def main():
"""Main application loop."""
while not shutdown_manager.is_shutting_down:
# Process requests
await asyncio.sleep(0.1)
# asyncio.run(main())
Follow-Up Questions
-
Explain the difference between blue-green and canary deployments.
-
How do you handle database migrations in production?
-
What are the best practices for logging in microservices?
-
How do you implement circuit breakers for external dependencies?
-
Explain the concept of observability (logs, metrics, traces).