πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Production Python Patterns

Python InterviewProduction Readiness⭐ Premium

Advertisement

Production Python Patterns

Difficulty: Hard | Companies: Google, Meta, Amazon, Netflix, Stripe

Logging Best Practices

import logging
import json
import sys
from datetime import datetime
from typing import Any, Dict
from contextvars import ContextVar
import uuid

# Structured Logging
class StructuredFormatter(logging.Formatter):
    """JSON structured log formatter."""
    
    def format(self, record: logging.LogRecord) -> str:
        log_data = {
            "timestamp": datetime.utcnow().isoformat(),
            "level": record.levelname,
            "logger": record.name,
            "message": record.getMessage(),
            "module": record.module,
            "function": record.funcName,
            "line": record.lineno,
        }
        
        if record.exc_info:
            log_data["exception"] = self.formatException(record.exc_info)
        
        # Add custom fields
        if hasattr(record, 'extra_data'):
            log_data["data"] = record.extra_data
        
        return json.dumps(log_data)

# Context Variables for Request Tracking
request_id_var: ContextVar[str] = ContextVar('request_id', default='')

class RequestContextFilter(logging.Filter):
    """Add request context to logs."""
    
    def filter(self, record):
        record.request_id = request_id_var.get('')
        return True

# Logger Setup
def setup_logging(log_level: str = "INFO", json_output: bool = True):
    """Configure production logging."""
    logger = logging.getLogger()
    logger.setLevel(getattr(logging, log_level.upper()))
    
    # Clear existing handlers
    logger.handlers.clear()
    
    # Console handler
    handler = logging.StreamHandler(sys.stdout)
    
    if json_output:
        handler.setFormatter(StructuredFormatter())
    else:
        handler.setFormatter(logging.Formatter(
            '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        ))
    
    # Add context filter
    handler.addFilter(RequestContextFilter())
    
    logger.addHandler(handler)
    
    return logger

# Application Logger
logger = setup_logging(json_output=True)

class ApplicationLogger:
    """Application-specific logging utilities."""
    
    @staticmethod
    def log_request(method: str, path: str, status_code: int, duration: float):
        """Log HTTP request."""
        logger.info(
            "HTTP Request",
            extra={
                "extra_data": {
                    "method": method,
                    "path": path,
                    "status_code": status_code,
                    "duration_ms": duration * 1000,
                    "request_id": request_id_var.get('')
                }
            }
        )
    
    @staticmethod
    def log_database_query(query: str, duration: float, rows_affected: int):
        """Log database query."""
        logger.debug(
            "Database Query",
            extra={
                "extra_data": {
                    "query": query[:200],  # Truncate long queries
                    "duration_ms": duration * 1000,
                    "rows_affected": rows_affected
                }
            }
        )
    
    @staticmethod
    def log_error(error: Exception, context: Dict[str, Any] = None):
        """Log error with context."""
        logger.error(
            f"Error: {str(error)}",
            exc_info=True,
            extra={
                "extra_data": {
                    "error_type": type(error).__name__,
                    "context": context or {}
                }
            }
        )

# Usage
def handle_request():
    """Example request handling with logging."""
    request_id_var.set(str(uuid.uuid4()))
    
    try:
        logger.info("Processing request")
        # Process request
        ApplicationLogger.log_request("GET", "/api/users", 200, 0.05)
    except Exception as e:
        ApplicationLogger.log_error(e, {"user_id": 123})
        raise

ℹ️

Use structured logging (JSON format) in production for better log aggregation and analysis with tools like ELK Stack or Datadog.

Monitoring and Metrics

from prometheus_client import Counter, Histogram, Gauge, start_http_server
from typing import Callable
import time
from functools import wraps
from contextlib import contextmanager

# Prometheus Metrics
REQUEST_COUNT = Counter(
    'http_requests_total',
    'Total HTTP requests',
    ['method', 'endpoint', 'status_code']
)

REQUEST_LATENCY = Histogram(
    'http_request_duration_seconds',
    'HTTP request latency',
    ['method', 'endpoint'],
    buckets=[.005, .01, .025, .05, .1, .25, .5, 1.0, 2.5, 5.0]
)

ACTIVE_REQUESTS = Gauge(
    'http_requests_active',
    'Number of active HTTP requests'
)

DB_QUERY_COUNT = Counter(
    'database_queries_total',
    'Total database queries',
    ['operation', 'table']
)

DB_QUERY_LATENCY = Histogram(
    'database_query_duration_seconds',
    'Database query latency',
    ['operation', 'table']
)

# Metrics Middleware
class MetricsMiddleware:
    """Collect metrics for all requests."""
    
    def __init__(self, app):
        self.app = app
    
    def __call__(self, environ, start_response):
        start_time = time.time()
        ACTIVE_REQUESTS.inc()
        
        def custom_start_response(status, headers, exc_info=None):
            # Extract status code
            status_code = int(status.split(' ')[0])
            
            # Record metrics
            REQUEST_COUNT.labels(
                method=environ.get('REQUEST_METHOD', 'UNKNOWN'),
                endpoint=environ.get('PATH_INFO', '/'),
                status_code=status_code
            ).inc()
            
            REQUEST_LATENCY.labels(
                method=environ.get('REQUEST_METHOD', 'UNKNOWN'),
                endpoint=environ.get('PATH_INFO', '/')
            ).observe(time.time() - start_time)
            
            ACTIVE_REQUESTS.dec()
            
            return start_response(status, headers, exc_info)
        
        return self.app(environ, custom_start_response)

# Performance Monitoring
class PerformanceMonitor:
    """Application performance monitoring."""
    
    def __init__(self):
        self.metrics = {}
    
    @contextmanager
    def track_operation(self, operation_name: str):
        """Track operation duration."""
        start_time = time.time()
        try:
            yield
        finally:
            duration = time.time() - start_time
            self.record_metric(operation_name, duration)
    
    def record_metric(self, name: str, value: float):
        """Record a metric value."""
        if name not in self.metrics:
            self.metrics[name] = []
        
        self.metrics[name].append({
            'value': value,
            'timestamp': datetime.now()
        })
        
        # Keep only last 1000 metrics
        if len(self.metrics[name]) > 1000:
            self.metrics[name] = self.metrics[name][-1000:]
    
    def get_stats(self, name: str) -> dict:
        """Get statistics for a metric."""
        if name not in self.metrics:
            return {}
        
        values = [m['value'] for m in self.metrics[name]]
        
        return {
            'count': len(values),
            'sum': sum(values),
            'avg': sum(values) / len(values),
            'min': min(values),
            'max': max(values)
        }

# Health Check Endpoints
class HealthChecker:
    """Application health checks."""
    
    def __init__(self):
        self.checks = {}
    
    def register_check(self, name: str, check_func: Callable):
        """Register a health check."""
        self.checks[name] = check_func
    
    def run_checks(self) -> dict:
        """Run all health checks."""
        results = {}
        all_healthy = True
        
        for name, check_func in self.checks.items():
            try:
                is_healthy = check_func()
                results[name] = {"status": "healthy" if is_healthy else "unhealthy"}
                if not is_healthy:
                    all_healthy = False
            except Exception as e:
                results[name] = {"status": "error", "error": str(e)}
                all_healthy = False
        
        return {
            "status": "healthy" if all_healthy else "unhealthy",
            "checks": results
        }

# Usage
monitor = PerformanceMonitor()
health_checker = HealthChecker()

def check_database():
    """Check database connectivity."""
    return True  # Actual check implementation

def check_cache():
    """Check cache connectivity."""
    return True  # Actual check implementation

health_checker.register_check("database", check_database)
health_checker.register_check("cache", check_cache)

# Start metrics server
# start_http_server(8000)

Configuration Management

from pydantic import BaseSettings, Field
from typing import Optional, List
from functools import lru_cache
import os
from pathlib import Path
from dotenv import load_dotenv

# Load environment variables
load_dotenv()

class Settings(BaseSettings):
    """Application settings with validation."""
    
    # Application
    app_name: str = "My App"
    app_version: str = "1.0.0"
    debug: bool = False
    environment: str = Field("development", regex="^(development|staging|production)$")
    
    # Database
    database_url: str = "sqlite:///./app.db"
    database_pool_size: int = 5
    database_max_overflow: int = 10
    
    # Redis
    redis_url: str = "redis://localhost:6379/0"
    
    # Security
    secret_key: str = Field(..., min_length=32)
    access_token_expire_minutes: int = 30
    allowed_origins: List[str] = ["http://localhost:3000"]
    
    # Logging
    log_level: str = "INFO"
    log_format: str = "json"
    
    # Monitoring
    enable_metrics: bool = True
    metrics_port: int = 9090
    
    class Config:
        env_file = ".env"
        env_file_encoding = "utf-8"
        case_sensitive = False

@lru_cache()
def get_settings() -> Settings:
    """Get cached settings instance."""
    return Settings()

# Environment-specific configurations
class DevelopmentSettings(Settings):
    """Development environment settings."""
    debug: bool = True
    database_url: str = "sqlite:///./dev.db"
    log_level: str = "DEBUG"

class ProductionSettings(Settings):
    """Production environment settings."""
    debug: bool = False
    database_pool_size: int = 20
    database_max_overflow: int = 30
    log_level: str = "WARNING"

# Configuration factory
def get_environment_settings() -> Settings:
    """Get settings based on environment."""
    env = os.getenv("ENVIRONMENT", "development")
    
    if env == "production":
        return ProductionSettings()
    elif env == "development":
        return DevelopmentSettings()
    else:
        return Settings()

# Usage
settings = get_settings()
print(f"App: {settings.app_name}, Env: {settings.environment}")

⚠️

Never commit sensitive configuration to version control. Use environment variables or secret management systems.

Containerization with Docker

# Dockerfile example (would be in Dockerfile, not .py)
"""
FROM python:3.11-slim

WORKDIR /app

# Install system dependencies
RUN apt-get update && apt-get install -y \
    gcc \
    && rm -rf /var/lib/apt/lists/*

# Install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy application code
COPY . .

# Create non-root user
RUN useradd --create-home --shell /bin/bash appuser
USER appuser

# Expose port
EXPOSE 8000

# Health check
HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \
    CMD curl -f http://localhost:8000/health || exit 1

# Run application
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
"""

# Docker Compose configuration
"""
version: '3.8'

services:
  web:
    build: .
    ports:
      - "8000:8000"
    environment:
      - ENVIRONMENT=production
      - DATABASE_URL=postgresql://user:pass@db:5432/app
      - REDIS_URL=redis://redis:6379/0
    depends_on:
      - db
      - redis
    volumes:
      - ./logs:/app/logs
    restart: unless-stopped

  db:
    image: postgres:15
    environment:
      - POSTGRES_USER=user
      - POSTGRES_PASSWORD=pass
      - POSTGRES_DB=app
    volumes:
      - postgres_data:/var/lib/postgresql/data
    ports:
      - "5432:5432"

  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"

  nginx:
    image: nginx:alpine
    ports:
      - "80:80"
    volumes:
      - ./nginx.conf:/etc/nginx/nginx.conf
    depends_on:
      - web

volumes:
  postgres_data:
"""

# Python Docker utilities
class DockerManager:
    """Docker container management utilities."""
    
    @staticmethod
    def get_container_stats(container_name: str) -> dict:
        """Get container statistics."""
        import subprocess
        
        result = subprocess.run(
            ["docker", "stats", container_name, "--no-stream", "--format", "{{json .}}"],
            capture_output=True,
            text=True
        )
        
        if result.returncode == 0:
            return json.loads(result.stdout)
        return {}
    
    @staticmethod
    def health_check(url: str, timeout: int = 5) -> bool:
        """Check container health."""
        import requests
        
        try:
            response = requests.get(url, timeout=timeout)
            return response.status_code == 200
        except requests.RequestException:
            return False

Deployment Patterns

from enum import Enum
from typing import Optional
import subprocess
from dataclasses import dataclass
from datetime import datetime

class DeploymentStrategy(Enum):
    """Deployment strategies."""
    BLUE_GREEN = "blue_green"
    CANARY = "canary"
    ROLLING = "rolling"

@dataclass
class DeploymentConfig:
    """Deployment configuration."""
    strategy: DeploymentStrategy
    version: str
    replicas: int = 3
    health_check_url: str = "/health"
    rollback_on_failure: bool = True

class DeploymentManager:
    """Manage application deployments."""
    
    def __init__(self, config: DeploymentConfig):
        self.config = config
        self.deployment_history = []
    
    def deploy(self) -> bool:
        """Execute deployment."""
        print(f"Deploying version {self.config.version} using {self.config.strategy.value}")
        
        try:
            if self.config.strategy == DeploymentStrategy.BLUE_GREEN:
                return self._blue_green_deploy()
            elif self.config.strategy == DeploymentStrategy.CANARY:
                return self._canary_deploy()
            elif self.config.strategy == DeploymentStrategy.ROLLING:
                return self._rolling_deploy()
        except Exception as e:
            print(f"Deployment failed: {e}")
            if self.config.rollback_on_failure:
                self.rollback()
            return False
        
        return True
    
    def _blue_green_deploy(self) -> bool:
        """Blue-green deployment."""
        print("Switching traffic to green environment")
        # Implementation would switch load balancer
        return True
    
    def _canary_deploy(self) -> bool:
        """Canary deployment."""
        print("Gradually shifting traffic to new version")
        # Implementation would adjust traffic percentages
        return True
    
    def _rolling_deploy(self) -> bool:
        """Rolling deployment."""
        print("Updating instances one by one")
        # Implementation would update pods/instances
        return True
    
    def rollback(self) -> bool:
        """Rollback to previous version."""
        print("Rolling back deployment")
        # Implementation would restore previous version
        return True
    
    def record_deployment(self, success: bool):
        """Record deployment in history."""
        self.deployment_history.append({
            'version': self.config.version,
            'strategy': self.config.strategy.value,
            'timestamp': datetime.now(),
            'success': success
        })

# CI/CD Pipeline Integration
class CICDPipeline:
    """CI/CD pipeline configuration."""
    
    def __init__(self):
        self.stages = []
    
    def add_stage(self, name: str, command: str):
        """Add pipeline stage."""
        self.stages.append({'name': name, 'command': command})
    
    def run(self) -> bool:
        """Execute pipeline."""
        for stage in self.stages:
            print(f"Running stage: {stage['name']}")
            
            result = subprocess.run(
                stage['command'],
                shell=True,
                capture_output=True,
                text=True
            )
            
            if result.returncode != 0:
                print(f"Stage {stage['name']} failed: {result.stderr}")
                return False
            
            print(f"Stage {stage['name']} completed successfully")
        
        return True

# Example pipeline
pipeline = CICDPipeline()
pipeline.add_stage("lint", "flake8 .")
pipeline.add_stage("test", "pytest tests/")
pipeline.add_stage("build", "docker build -t myapp:latest .")
pipeline.add_stage("push", "docker push myapp:latest")

ℹ️

Always implement proper health checks, graceful shutdown, and monitoring in production deployments.

Graceful Shutdown

import signal
import sys
from typing import Callable
import asyncio

class GracefulShutdown:
    """Handle graceful application shutdown."""
    
    def __init__(self):
        self.shutdown_hooks = []
        self.is_shutting_down = False
    
    def register_hook(self, hook: Callable):
        """Register shutdown hook."""
        self.shutdown_hooks.append(hook)
    
    def signal_handler(self, signum, frame):
        """Handle shutdown signals."""
        print(f"Received signal {signum}, initiating graceful shutdown...")
        self.is_shutting_down = True
        
        # Execute shutdown hooks
        for hook in self.shutdown_hooks:
            try:
                hook()
            except Exception as e:
                print(f"Error in shutdown hook: {e}")
        
        print("Graceful shutdown complete")
        sys.exit(0)
    
    def setup(self):
        """Setup signal handlers."""
        signal.signal(signal.SIGINT, self.signal_handler)
        signal.signal(signal.SIGTERM, self.signal_handler)

# Usage
shutdown_manager = GracefulShutdown()

def cleanup_database():
    """Close database connections."""
    print("Closing database connections")

def cleanup_cache():
    """Flush cache."""
    print("Flushing cache")

shutdown_manager.register_hook(cleanup_database)
shutdown_manager.register_hook(cleanup_cache)
shutdown_manager.setup()

# Application main loop
async def main():
    """Main application loop."""
    while not shutdown_manager.is_shutting_down:
        # Process requests
        await asyncio.sleep(0.1)

# asyncio.run(main())

Follow-Up Questions

  1. Explain the difference between blue-green and canary deployments.

  2. How do you handle database migrations in production?

  3. What are the best practices for logging in microservices?

  4. How do you implement circuit breakers for external dependencies?

  5. Explain the concept of observability (logs, metrics, traces).

Advertisement