๐ŸŽ‰ 75% of content is free forever โ€” Unlock Premium from $10/mo โ†’
CW
Search coursesโ€ฆ
๐Ÿ’ผ Servicesโ„น๏ธ Aboutโœ‰๏ธ ContactView Pricing Plansfrom $10

Design a Cloud-Native Application: 12-Factor App

Cloud ArchitectureCloud-Native Designโญ Premium

Advertisement

Cloud-Native Architecture: 12-Factor App

Difficulty: Senior/Staff Level | Companies: Netflix, Uber, Airbnb, Amazon, Google

Interview Question

"Design a cloud-native application following the 12-Factor App methodology. How would you handle configuration management, state management, and service decomposition for a platform serving 100M+ users?"

โ„น๏ธKey Concepts

This question tests your understanding of cloud-native principles, distributed systems design, and practical implementation of modern architecture patterns.

The 12-Factor App Methodology

Complete Architecture Overview

Architecture Diagram
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                    12-FACTOR CLOUD-NATIVE ARCHITECTURE               โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚                                                                     โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”                โ”‚
โ”‚  โ”‚   CODEBASE  โ”‚  โ”‚ DEPENDENCIESโ”‚  โ”‚ CONFIG      โ”‚                โ”‚
โ”‚  โ”‚  (Git Repo) โ”‚  โ”‚ (Package)   โ”‚  โ”‚ (Env Vars)  โ”‚                โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”˜                โ”‚
โ”‚         โ”‚                โ”‚                โ”‚                         โ”‚
โ”‚         โ–ผ                โ–ผ                โ–ผ                         โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”                โ”‚
โ”‚  โ”‚   BACKING   โ”‚  โ”‚  BUILD      โ”‚  โ”‚   PROCESSES โ”‚                โ”‚
โ”‚  โ”‚   SERVICES  โ”‚  โ”‚  RELEASE    โ”‚  โ”‚  (Stateless)โ”‚                โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”˜                โ”‚
โ”‚         โ”‚                โ”‚                โ”‚                         โ”‚
โ”‚         โ–ผ                โ–ผ                โ–ผ                         โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”                โ”‚
โ”‚  โ”‚   CONCURRENCYโ”‚  โ”‚  DISPOSABILITYโ”‚ โ”‚ PORT BINDINGโ”‚               โ”‚
โ”‚  โ”‚  (Scale Out)โ”‚  โ”‚  (Fast Startup)โ”‚ โ”‚ (HTTP)      โ”‚               โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”˜                โ”‚
โ”‚         โ”‚                โ”‚                โ”‚                         โ”‚
โ”‚         โ–ผ                โ–ผ                โ–ผ                         โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”                โ”‚
โ”‚  โ”‚   DEV/PROD  โ”‚  โ”‚  LOGS       โ”‚  โ”‚   ADMIN     โ”‚                โ”‚
โ”‚  โ”‚  PARITY     โ”‚  โ”‚  (Events)   โ”‚  โ”‚  PROCESSES  โ”‚                โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜                โ”‚
โ”‚                                                                     โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Mathematical Foundation: Capacity Planning

For a platform serving 100M+ users, we need to calculate resource requirements:

User Distribution Model:

  • Total users: N = 100,000,000
  • Daily active users: D = 0.3 ร— N = 30,000,000
  • Requests per user per day: R = 50
  • Total daily requests: Q_d = D ร— R = 1.5 ร— 10^9

Peak Load Calculation:

  • Peak multiplier: M = 3x average
  • Peak requests per second: Q_peak = (Q_d ร— M) / 86,400
  • Q_peak = (1.5 ร— 10^9 ร— 3) / 86,400 โ‰ˆ 52,083 RPS

Resource Requirements:

  • Average response time: T = 100ms
  • Throughput per instance: I = 1/T = 10 req/s
  • Minimum instances needed: I_min = Q_peak / I = 5,209 instances

Implementation: Factor 1 - Codebase

# AWS CodeCommit for source control
resource "aws_codecommit_repository" "cloud_native_app" {
  repository_name = "cloud-native-app"
  description     = "12-Factor cloud-native application"

  tags = {
    Environment = var.environment
    ManagedBy   = "Terraform"
  }
}

# CodePipeline for CI/CD
resource "aws_codepipeline" "main_pipeline" {
  name     = "cloud-native-app-pipeline"
  role_arn = aws_iam_role.pipeline_role.arn

  artifact_store {
    location = aws_s3_bucket.artifacts.bucket
    type     = "S3"
  }

  stage {
    name = "Source"
    action {
      name             = "Source"
      category         = "Source"
      owner            = "AWS"
      provider         = "CodeCommit"
      version          = "1"
      output_artifacts = ["source_output"]

      configuration = {
        RepositoryName       = aws_codecommit_repository.cloud_native_app.repository_name
        BranchName           = "main"
        PollForSourceChanges = false
      }
    }
  }

  stage {
    name = "Build"
    action {
      name             = "Build"
      category         = "Build"
      owner            = "AWS"
      provider         = "CodeBuild"
      input_artifacts  = ["source_output"]
      output_artifacts = ["build_output"]
      version          = "1"

      configuration = {
        ProjectName = aws_codebuild_project.build.name
      }
    }
  }
}

Factor 2 - Dependencies

# requirements.txt (Python example)
# All dependencies explicitly declared

# Core framework
fastapi==0.104.1
uvicorn[standard]==0.24.0

# Database drivers
psycopg2-binary==2.9.9
motor==3.3.2  # Async MongoDB driver

# AWS SDK
boto3==1.33.6
botocore==1.33.6

# Monitoring
prometheus-client==0.19.0
opentelemetry-api==1.21.0
opentelemetry-sdk==1.21.0

# Serialization
pydantic==2.5.2
orjson==3.9.10

# Testing
pytest==7.4.3
pytest-asyncio==0.23.2
httpx==0.25.2

โš ๏ธDependency Management

Never mix development and production dependencies in the same environment. Use virtual environments or containers to ensure isolation.

Factor 3 - Configuration Management

# AWS Parameter Store for configuration
resource "aws_ssm_parameter" "app_config" {
  for_each = {
    "/app/database/host"     = var.db_host
    "/app/database/port"     = var.db_port
    "/app/redis/host"        = var.redis_host
    "/app/api/key"           = var.api_key
    "/app/feature/flags"     = jsonencode(var.feature_flags)
  }

  name        = each.key
  description = "Application configuration parameter"
  type        = "SecureString"
  value       = each.value

  tags = {
    Environment = var.environment
  }
}

# ECS task definition with environment from Parameter Store
resource "aws_ecs_task_definition" "app" {
  family                   = "cloud-native-app"
  network_mode             = "awsvpc"
  requires_compatibilities = ["FARGATE"]
  cpu                      = "1024"
  memory                   = "2048"
  execution_role_arn       = aws_iam_role.ecs_execution.arn
  task_role_arn            = aws_iam_role.ecs_task.arn

  container_definitions = jsonencode([
    {
      name  = "app"
      image = "${aws_ecr_repository.app.repository_url}:latest"

      portMappings = [
        {
          containerPort = 8000
          hostPort      = 8000
          protocol      = "tcp"
        }
      ]

      environment = [
        {
          name  = "APP_ENV"
          value = var.environment
        },
        {
          name  = "APP_REGION"
          value = var.aws_region
        }
      ]

      secrets = [
        {
          name      = "DATABASE_URL"
          valueFrom = aws_ssm_parameter.db_url.arn
        },
        {
          name      = "REDIS_URL"
          valueFrom = aws_ssm_parameter.redis_url.arn
        }
      ]

      logConfiguration = {
        logDriver = "awslogs"
        options = {
          "awslogs-group"         = aws_cloudwatch_log_group.app.name
          "awslogs-region"        = var.aws_region
          "awslogs-stream-prefix" = "app"
        }
      }
    }
  ])
}

Factor 4 - Backing Services

# Service abstraction layer
from abc import ABC, abstractmethod
from typing import Any, Optional
import asyncio
from dataclasses import dataclass
from enum import Enum

class ServiceType(Enum):
    DATABASE = "database"
    CACHE = "cache"
    QUEUE = "queue"
    STORAGE = "storage"

@dataclass
class ServiceConfig:
    host: str
    port: int
    credentials: Optional[dict] = None
    options: Optional[dict] = None

class BackingService(ABC):
    """Abstract base class for all backing services"""

    @abstractmethod
    async def connect(self) -> None:
        pass

    @abstractmethod
    async def disconnect(self) -> None:
        pass

    @abstractmethod
    async def health_check(self) -> bool:
        pass

class PostgreSQLService(BackingService):
    def __init__(self, config: ServiceConfig):
        self.config = config
        self.pool = None

    async def connect(self):
        import psycopg2
        from psycopg2 import pool

        self.pool = pool.ThreadedConnectionPool(
            minconn=5,
            maxconn=20,
            host=self.config.host,
            port=self.config.port,
            **self.config.credentials
        )

    async def disconnect(self):
        if self.pool:
            self.pool.closeall()

    async def health_check(self) -> bool:
        try:
            conn = self.pool.getconn()
            cursor = conn.cursor()
            cursor.execute("SELECT 1")
            cursor.close()
            self.pool.putconn(conn)
            return True
        except Exception:
            return False

class RedisService(BackingService):
    def __init__(self, config: ServiceConfig):
        self.config = config
        self.client = None

    async def connect(self):
        import redis.asyncio as redis
        self.client = redis.Redis(
            host=self.config.host,
            port=self.config.port,
            **self.config.options
        )

    async def disconnect(self):
        if self.client:
            await self.client.close()

    async def health_check(self) -> bool:
        try:
            await self.client.ping()
            return True
        except Exception:
            return False

class ServiceManager:
    """Manages all backing services"""

    def __init__(self):
        self.services: dict[str, BackingService] = {}

    def register(self, name: str, service: BackingService):
        self.services[name] = service

    async def connect_all(self):
        await asyncio.gather(
            *[service.connect() for service in self.services.values()]
        )

    async def disconnect_all(self):
        await asyncio.gather(
            *[service.disconnect() for service in self.services.values()]
        )

    async def health_check_all(self) -> dict[str, bool]:
        results = {}
        for name, service in self.services.items():
            results[name] = await service.health_check()
        return results

โœ…Best Practice

Backing services should be treated as attached resources. The application should be able to connect to any backing service (local or cloud) without code changes.

Factor 5 - Build, Release, Run

# Build pipeline configuration
# buildspec.yml for AWS CodeBuild
version: 0.2

phases:
  pre_build:
    commands:
      - echo Logging in to Amazon ECR...
      - aws ecr get-login-password --region $AWS_DEFAULT_REGION | docker login --username AWS --password-stdin $AWS_ACCOUNT_ID.dkr.ecr.$AWS_DEFAULT_REGION.amazonaws.com
      - COMMIT_HASH=$(echo $CODEBUILD_RESOLVED_SOURCE_VERSION | cut -c 1-7)
      - IMAGE_TAG=${COMMIT_HASH:=latest}

  build:
    commands:
      - echo Build started on `date`
      - echo Building the Docker image...
      - docker build -t $REPOSITORY_URI:latest .
      - docker tag $REPOSITORY_URI:latest $REPOSITORY_URI:$IMAGE_TAG

  post_build:
    commands:
      - echo Build completed on `date`
      - docker push $REPOSITORY_URI:latest
      - docker push $REPOSITORY_URI:$IMAGE_TAG
      - echo Writing image definition file...
      - printf '[{"name":"app","imageUri":"%s"}]' $REPOSITORY_URI:$IMAGE_TAG > imagedefinitions.json

artifacts:
  files:
    - imagedefinitions.json
    - cloudformation/**/*

cache:
  paths:
    - '/root/.cache/pip'
    - '/root/.docker'
# Release management with AWS ECS
resource "aws_ecs_service" "app" {
  name            = "cloud-native-app"
  cluster         = aws_ecs_cluster.main.id
  task_definition = aws_ecs_task_definition.app.arn
  desired_count   = var.desired_count
  launch_type     = "FARGATE"

  deployment_configuration {
    maximum_percent         = 200
    minimum_healthy_percent = 100
    deployment_circuit_breaker {
      enable   = true
      rollback = true
    }
  }

  network_configuration {
    security_groups  = [aws_security_group.app.id]
    subnets          = aws_subnet.private[*].id
    assign_public_ip = false
  }

  load_balancer {
    target_group_arn = aws_lb_target_group.app.arn
    container_name   = "app"
    container_port   = 8000
  }

  depends_on = [aws_lb_listener.https]
}

Factor 6 - Processes

# Stateless process design
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional
import uuid
from datetime import datetime

app = FastAPI()

class SessionData(BaseModel):
    user_id: str
    preferences: dict
    timestamp: datetime

class ProcessManager:
    """Manages stateless processes"""

    def __init__(self):
        self.process_id = str(uuid.uuid4())
        self.start_time = datetime.utcnow()

    def get_process_info(self) -> dict:
        return {
            "process_id": self.process_id,
            "start_time": self.start_time.isoformat(),
            "uptime_seconds": (datetime.utcnow() - self.start_time).total_seconds()
        }

# No state stored in process - all state in external services
@app.post("/api/v1/sessions")
async def create_session(user_id: str):
    session_id = str(uuid.uuid4())
    # Store in Redis, not in process memory
    session_data = SessionData(
        user_id=user_id,
        preferences={},
        timestamp=datetime.utcnow()
    )
    # redis.set(f"session:{session_id}", session_data.json())
    return {"session_id": session_id}

@app.get("/api/v1/sessions/{session_id}")
async def get_session(session_id: str):
    # Retrieve from external store, not from process memory
    # session = redis.get(f"session:{session_id}")
    # if not session:
    #     raise HTTPException(status_code=404, detail="Session not found")
    # return SessionData.parse_raw(session)
    return {"session_id": session_id, "data": {}}

Factor 7 - Port Binding

# Self-contained HTTP server
import uvicorn
from fastapi import FastAPI
import os

app = FastAPI()

@app.get("/health")
async def health():
    return {"status": "healthy"}

@app.get("/")
async def root():
    return {"message": "Hello from 12-Factor App"}

if __name__ == "__main__":
    port = int(os.getenv("PORT", 8000))
    uvicorn.run(
        app,
        host="0.0.0.0",
        port=port,
        log_level="info"
    )

Factor 8 - Concurrency

# Process model: Scale out via processes
import asyncio
from typing import List
from concurrent.futures import ProcessPoolExecutor
import multiprocessing

class ConcurrencyManager:
    """Manages process-based concurrency"""

    def __init__(self, max_workers: int = None):
        self.max_workers = max_workers or multiprocessing.cpu_count()
        self.executor = ProcessPoolExecutor(max_workers=self.max_workers)

    async def process_task(self, task_func, *args):
        loop = asyncio.get_event_loop()
        return await loop.run_in_executor(self.executor, task_func, *args)

    async def process_batch(self, tasks: List[tuple]) -> List:
        coroutines = [
            self.process_task(task_func, *args)
            for task_func, *args in tasks
        ]
        return await asyncio.gather(*coroutines)

# Thread model: Scale out via threads (for I/O-bound work)
class ThreadManager:
    """Manages thread-based concurrency"""

    def __init__(self, max_workers: int = 50):
        self.max_workers = max_workers
        self.semaphore = asyncio.Semaphore(max_workers)

    async def limited_task(self, coro):
        async with self.semaphore:
            return await coro

# Horizontal scaling calculation
# For CPU-bound work:
#   Optimal processes = CPU cores ร— (1 + I/O wait time / CPU time)
#
# For I/O-bound work:
#   Optimal threads = (Total latency per request ร— Target RPS) / 1000
#
# Example:
#   Latency = 100ms, Target RPS = 1000
#   Threads needed = (100 ร— 1000) / 1000 = 100 threads per instance

Factor 9 - Disposability

# Fast startup and graceful shutdown
import signal
import sys
from contextlib import asynccontextmanager
from fastapi import FastAPI
import asyncio

shutdown_event = asyncio.Event()

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup
    print("Starting up...")
    # Initialize connections, load configs, etc.

    yield

    # Shutdown
    print("Shutting down gracefully...")
    shutdown_event.set()
    # Close connections, flush buffers, complete in-flight requests

app = FastAPI(lifespan=lifespan)

def signal_handler(signum, frame):
    print(f"Received signal {signum}, initiating graceful shutdown...")
    shutdown_event.set()

# Register signal handlers
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)

@app.get("/health")
async def health():
    return {"status": "healthy"}

@app.get("/ready")
async def readiness():
    # Check if all dependencies are ready
    return {"ready": not shutdown_event.is_set()}

โ„น๏ธDisposability

Processes should be disposable: start up quickly and shut down gracefully. This enables rapid deployment, scaling, and recovery.

Factor 10 - Dev/Prod Parity

# Infrastructure as Code ensures parity
module "dev_environment" {
  source = "./modules/environment"

  environment     = "dev"
  instance_type   = "t3.medium"
  desired_count   = 2
  database_instance = "db.t3.micro"
  enable_monitoring = true
}

module "staging_environment" {
  source = "./modules/environment"

  environment     = "staging"
  instance_type   = "t3.large"
  desired_count   = 4
  database_instance = "db.t3.small"
  enable_monitoring = true
}

module "production_environment" {
  source = "./modules/environment"

  environment     = "production"
  instance_type   = "c5.xlarge"
  desired_count   = 10
  database_instance = "db.r5.large"
  enable_monitoring = true
}
# Feature flags for parity
from enum import Enum
from typing import Dict, Any
import json

class FeatureFlag:
    """Feature flags for dev/prod parity"""

    def __init__(self, name: str, enabled: bool = False, config: Dict[str, Any] = None):
        self.name = name
        self.enabled = enabled
        self.config = config or {}

    def is_enabled(self) -> bool:
        return self.enabled

    def get_config(self) -> Dict[str, Any]:
        return self.config

class FeatureManager:
    """Manages feature flags across environments"""

    def __init__(self):
        self.flags: Dict[str, FeatureFlag] = {}

    def register(self, flag: FeatureFlag):
        self.flags[flag.name] = flag

    def is_enabled(self, flag_name: str) -> bool:
        flag = self.flags.get(flag_name)
        return flag.is_enabled() if flag else False

# Example usage
feature_manager = FeatureManager()
feature_manager.register(FeatureFlag("new_checkout_flow", enabled=True))
feature_manager.register(FeatureFlag("advanced_analytics", enabled=True, config={"sample_rate": 0.1}))

Factor 11 - Logs

# Structured logging as event streams
import logging
import json
from datetime import datetime
from typing import Any, Dict
import uuid

class StructuredLogger:
    """Structured logging for cloud-native applications"""

    def __init__(self, service_name: str):
        self.service_name = service_name
        self.logger = logging.getLogger(service_name)

    def log_event(self, event_type: str, data: Dict[str, Any], level: str = "INFO"):
        log_entry = {
            "timestamp": datetime.utcnow().isoformat(),
            "service": self.service_name,
            "event_type": event_type,
            "level": level,
            "trace_id": str(uuid.uuid4()),
            "data": data
        }
        self.logger.info(json.dumps(log_entry))

    def log_request(self, request_id: str, method: str, path: str, status_code: int, duration_ms: float):
        self.log_event("http_request", {
            "request_id": request_id,
            "method": method,
            "path": path,
            "status_code": status_code,
            "duration_ms": duration_ms
        })

    def log_error(self, error: Exception, context: Dict[str, Any] = None):
        self.log_event("error", {
            "error_type": type(error).__name__,
            "error_message": str(error),
            "context": context or {}
        }, level="ERROR")

# Usage
logger = StructuredLogger("cloud-native-app")
logger.log_request("req-123", "GET", "/api/users", 200, 45.2)

Factor 12 - Admin Processes

# One-off administrative tasks
import asyncio
from typing import List, Dict, Any

class AdminProcess:
    """Handles one-off administrative tasks"""

    @staticmethod
    async def migrate_database():
        """Run database migrations"""
        print("Running database migrations...")
        # Implement migration logic
        pass

    @staticmethod
    async def seed_data():
        """Seed initial data"""
        print("Seeding initial data...")
        # Implement seeding logic
        pass

    @staticmethod
    async def cleanup_old_data():
        """Clean up old data"""
        print("Cleaning up old data...")
        # Implement cleanup logic
        pass

# Run admin tasks
async def run_admin_task(task_name: str):
    tasks = {
        "migrate": AdminProcess.migrate_database,
        "seed": AdminProcess.seed_data,
        "cleanup": AdminProcess.cleanup_old_data
    }

    if task_name in tasks:
        await tasks[task_name]()
    else:
        print(f"Unknown task: {task_name}")

# Execute via CLI or container
if __name__ == "__main__":
    import sys
    if len(sys.argv) > 1:
        asyncio.run(run_admin_task(sys.argv[1]))

โœ…12-Factor Summary

Following the 12-Factor App methodology ensures your application is portable, scalable, and resilient. Each factor addresses a specific aspect of cloud-native development.

Summary Table

FactorDescriptionImplementation
I. CodebaseOne codebase in version controlGit + CodeCommit
II. DependenciesExplicitly declare dependenciesrequirements.txt
III. ConfigStore config in environmentParameter Store
IV. Backing ServicesTreat as attached resourcesService abstraction
V. Build/Release/RunSeparate build and run stagesCI/CD pipeline
VI. ProcessesStateless processesRedis for state
VII. Port BindingExport services via port bindinguvicorn on PORT
VIII. ConcurrencyScale via process modelHorizontal scaling
IX. DisposabilityFast startup, graceful shutdownSignal handlers
X. Dev/Prod ParityKeep environments similarTerraform modules
XI. LogsTreat logs as event streamsStructured logging
XII. AdminRun as one-off processesManagement scripts

Advertisement