Cloud-Native Architecture: 12-Factor App
Difficulty: Senior/Staff Level | Companies: Netflix, Uber, Airbnb, Amazon, Google
Interview Question
"Design a cloud-native application following the 12-Factor App methodology. How would you handle configuration management, state management, and service decomposition for a platform serving 100M+ users?"
โน๏ธKey Concepts
This question tests your understanding of cloud-native principles, distributed systems design, and practical implementation of modern architecture patterns.
The 12-Factor App Methodology
Complete Architecture Overview
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ 12-FACTOR CLOUD-NATIVE ARCHITECTURE โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ โ
โ โ CODEBASE โ โ DEPENDENCIESโ โ CONFIG โ โ
โ โ (Git Repo) โ โ (Package) โ โ (Env Vars) โ โ
โ โโโโโโโโฌโโโโโโโ โโโโโโโโฌโโโโโโโ โโโโโโโโฌโโโโโโโ โ
โ โ โ โ โ
โ โผ โผ โผ โ
โ โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ โ
โ โ BACKING โ โ BUILD โ โ PROCESSES โ โ
โ โ SERVICES โ โ RELEASE โ โ (Stateless)โ โ
โ โโโโโโโโฌโโโโโโโ โโโโโโโโฌโโโโโโโ โโโโโโโโฌโโโโโโโ โ
โ โ โ โ โ
โ โผ โผ โผ โ
โ โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ โ
โ โ CONCURRENCYโ โ DISPOSABILITYโ โ PORT BINDINGโ โ
โ โ (Scale Out)โ โ (Fast Startup)โ โ (HTTP) โ โ
โ โโโโโโโโฌโโโโโโโ โโโโโโโโฌโโโโโโโ โโโโโโโโฌโโโโโโโ โ
โ โ โ โ โ
โ โผ โผ โผ โ
โ โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ โ
โ โ DEV/PROD โ โ LOGS โ โ ADMIN โ โ
โ โ PARITY โ โ (Events) โ โ PROCESSES โ โ
โ โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Mathematical Foundation: Capacity Planning
For a platform serving 100M+ users, we need to calculate resource requirements:
User Distribution Model:
- Total users: N = 100,000,000
- Daily active users: D = 0.3 ร N = 30,000,000
- Requests per user per day: R = 50
- Total daily requests: Q_d = D ร R = 1.5 ร 10^9
Peak Load Calculation:
- Peak multiplier: M = 3x average
- Peak requests per second: Q_peak = (Q_d ร M) / 86,400
- Q_peak = (1.5 ร 10^9 ร 3) / 86,400 โ 52,083 RPS
Resource Requirements:
- Average response time: T = 100ms
- Throughput per instance: I = 1/T = 10 req/s
- Minimum instances needed: I_min = Q_peak / I = 5,209 instances
Implementation: Factor 1 - Codebase
# AWS CodeCommit for source control
resource "aws_codecommit_repository" "cloud_native_app" {
repository_name = "cloud-native-app"
description = "12-Factor cloud-native application"
tags = {
Environment = var.environment
ManagedBy = "Terraform"
}
}
# CodePipeline for CI/CD
resource "aws_codepipeline" "main_pipeline" {
name = "cloud-native-app-pipeline"
role_arn = aws_iam_role.pipeline_role.arn
artifact_store {
location = aws_s3_bucket.artifacts.bucket
type = "S3"
}
stage {
name = "Source"
action {
name = "Source"
category = "Source"
owner = "AWS"
provider = "CodeCommit"
version = "1"
output_artifacts = ["source_output"]
configuration = {
RepositoryName = aws_codecommit_repository.cloud_native_app.repository_name
BranchName = "main"
PollForSourceChanges = false
}
}
}
stage {
name = "Build"
action {
name = "Build"
category = "Build"
owner = "AWS"
provider = "CodeBuild"
input_artifacts = ["source_output"]
output_artifacts = ["build_output"]
version = "1"
configuration = {
ProjectName = aws_codebuild_project.build.name
}
}
}
}
Factor 2 - Dependencies
# requirements.txt (Python example)
# All dependencies explicitly declared
# Core framework
fastapi==0.104.1
uvicorn[standard]==0.24.0
# Database drivers
psycopg2-binary==2.9.9
motor==3.3.2 # Async MongoDB driver
# AWS SDK
boto3==1.33.6
botocore==1.33.6
# Monitoring
prometheus-client==0.19.0
opentelemetry-api==1.21.0
opentelemetry-sdk==1.21.0
# Serialization
pydantic==2.5.2
orjson==3.9.10
# Testing
pytest==7.4.3
pytest-asyncio==0.23.2
httpx==0.25.2
โ ๏ธDependency Management
Never mix development and production dependencies in the same environment. Use virtual environments or containers to ensure isolation.
Factor 3 - Configuration Management
# AWS Parameter Store for configuration
resource "aws_ssm_parameter" "app_config" {
for_each = {
"/app/database/host" = var.db_host
"/app/database/port" = var.db_port
"/app/redis/host" = var.redis_host
"/app/api/key" = var.api_key
"/app/feature/flags" = jsonencode(var.feature_flags)
}
name = each.key
description = "Application configuration parameter"
type = "SecureString"
value = each.value
tags = {
Environment = var.environment
}
}
# ECS task definition with environment from Parameter Store
resource "aws_ecs_task_definition" "app" {
family = "cloud-native-app"
network_mode = "awsvpc"
requires_compatibilities = ["FARGATE"]
cpu = "1024"
memory = "2048"
execution_role_arn = aws_iam_role.ecs_execution.arn
task_role_arn = aws_iam_role.ecs_task.arn
container_definitions = jsonencode([
{
name = "app"
image = "${aws_ecr_repository.app.repository_url}:latest"
portMappings = [
{
containerPort = 8000
hostPort = 8000
protocol = "tcp"
}
]
environment = [
{
name = "APP_ENV"
value = var.environment
},
{
name = "APP_REGION"
value = var.aws_region
}
]
secrets = [
{
name = "DATABASE_URL"
valueFrom = aws_ssm_parameter.db_url.arn
},
{
name = "REDIS_URL"
valueFrom = aws_ssm_parameter.redis_url.arn
}
]
logConfiguration = {
logDriver = "awslogs"
options = {
"awslogs-group" = aws_cloudwatch_log_group.app.name
"awslogs-region" = var.aws_region
"awslogs-stream-prefix" = "app"
}
}
}
])
}
Factor 4 - Backing Services
# Service abstraction layer
from abc import ABC, abstractmethod
from typing import Any, Optional
import asyncio
from dataclasses import dataclass
from enum import Enum
class ServiceType(Enum):
DATABASE = "database"
CACHE = "cache"
QUEUE = "queue"
STORAGE = "storage"
@dataclass
class ServiceConfig:
host: str
port: int
credentials: Optional[dict] = None
options: Optional[dict] = None
class BackingService(ABC):
"""Abstract base class for all backing services"""
@abstractmethod
async def connect(self) -> None:
pass
@abstractmethod
async def disconnect(self) -> None:
pass
@abstractmethod
async def health_check(self) -> bool:
pass
class PostgreSQLService(BackingService):
def __init__(self, config: ServiceConfig):
self.config = config
self.pool = None
async def connect(self):
import psycopg2
from psycopg2 import pool
self.pool = pool.ThreadedConnectionPool(
minconn=5,
maxconn=20,
host=self.config.host,
port=self.config.port,
**self.config.credentials
)
async def disconnect(self):
if self.pool:
self.pool.closeall()
async def health_check(self) -> bool:
try:
conn = self.pool.getconn()
cursor = conn.cursor()
cursor.execute("SELECT 1")
cursor.close()
self.pool.putconn(conn)
return True
except Exception:
return False
class RedisService(BackingService):
def __init__(self, config: ServiceConfig):
self.config = config
self.client = None
async def connect(self):
import redis.asyncio as redis
self.client = redis.Redis(
host=self.config.host,
port=self.config.port,
**self.config.options
)
async def disconnect(self):
if self.client:
await self.client.close()
async def health_check(self) -> bool:
try:
await self.client.ping()
return True
except Exception:
return False
class ServiceManager:
"""Manages all backing services"""
def __init__(self):
self.services: dict[str, BackingService] = {}
def register(self, name: str, service: BackingService):
self.services[name] = service
async def connect_all(self):
await asyncio.gather(
*[service.connect() for service in self.services.values()]
)
async def disconnect_all(self):
await asyncio.gather(
*[service.disconnect() for service in self.services.values()]
)
async def health_check_all(self) -> dict[str, bool]:
results = {}
for name, service in self.services.items():
results[name] = await service.health_check()
return results
โ Best Practice
Backing services should be treated as attached resources. The application should be able to connect to any backing service (local or cloud) without code changes.
Factor 5 - Build, Release, Run
# Build pipeline configuration
# buildspec.yml for AWS CodeBuild
version: 0.2
phases:
pre_build:
commands:
- echo Logging in to Amazon ECR...
- aws ecr get-login-password --region $AWS_DEFAULT_REGION | docker login --username AWS --password-stdin $AWS_ACCOUNT_ID.dkr.ecr.$AWS_DEFAULT_REGION.amazonaws.com
- COMMIT_HASH=$(echo $CODEBUILD_RESOLVED_SOURCE_VERSION | cut -c 1-7)
- IMAGE_TAG=${COMMIT_HASH:=latest}
build:
commands:
- echo Build started on `date`
- echo Building the Docker image...
- docker build -t $REPOSITORY_URI:latest .
- docker tag $REPOSITORY_URI:latest $REPOSITORY_URI:$IMAGE_TAG
post_build:
commands:
- echo Build completed on `date`
- docker push $REPOSITORY_URI:latest
- docker push $REPOSITORY_URI:$IMAGE_TAG
- echo Writing image definition file...
- printf '[{"name":"app","imageUri":"%s"}]' $REPOSITORY_URI:$IMAGE_TAG > imagedefinitions.json
artifacts:
files:
- imagedefinitions.json
- cloudformation/**/*
cache:
paths:
- '/root/.cache/pip'
- '/root/.docker'
# Release management with AWS ECS
resource "aws_ecs_service" "app" {
name = "cloud-native-app"
cluster = aws_ecs_cluster.main.id
task_definition = aws_ecs_task_definition.app.arn
desired_count = var.desired_count
launch_type = "FARGATE"
deployment_configuration {
maximum_percent = 200
minimum_healthy_percent = 100
deployment_circuit_breaker {
enable = true
rollback = true
}
}
network_configuration {
security_groups = [aws_security_group.app.id]
subnets = aws_subnet.private[*].id
assign_public_ip = false
}
load_balancer {
target_group_arn = aws_lb_target_group.app.arn
container_name = "app"
container_port = 8000
}
depends_on = [aws_lb_listener.https]
}
Factor 6 - Processes
# Stateless process design
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional
import uuid
from datetime import datetime
app = FastAPI()
class SessionData(BaseModel):
user_id: str
preferences: dict
timestamp: datetime
class ProcessManager:
"""Manages stateless processes"""
def __init__(self):
self.process_id = str(uuid.uuid4())
self.start_time = datetime.utcnow()
def get_process_info(self) -> dict:
return {
"process_id": self.process_id,
"start_time": self.start_time.isoformat(),
"uptime_seconds": (datetime.utcnow() - self.start_time).total_seconds()
}
# No state stored in process - all state in external services
@app.post("/api/v1/sessions")
async def create_session(user_id: str):
session_id = str(uuid.uuid4())
# Store in Redis, not in process memory
session_data = SessionData(
user_id=user_id,
preferences={},
timestamp=datetime.utcnow()
)
# redis.set(f"session:{session_id}", session_data.json())
return {"session_id": session_id}
@app.get("/api/v1/sessions/{session_id}")
async def get_session(session_id: str):
# Retrieve from external store, not from process memory
# session = redis.get(f"session:{session_id}")
# if not session:
# raise HTTPException(status_code=404, detail="Session not found")
# return SessionData.parse_raw(session)
return {"session_id": session_id, "data": {}}
Factor 7 - Port Binding
# Self-contained HTTP server
import uvicorn
from fastapi import FastAPI
import os
app = FastAPI()
@app.get("/health")
async def health():
return {"status": "healthy"}
@app.get("/")
async def root():
return {"message": "Hello from 12-Factor App"}
if __name__ == "__main__":
port = int(os.getenv("PORT", 8000))
uvicorn.run(
app,
host="0.0.0.0",
port=port,
log_level="info"
)
Factor 8 - Concurrency
# Process model: Scale out via processes
import asyncio
from typing import List
from concurrent.futures import ProcessPoolExecutor
import multiprocessing
class ConcurrencyManager:
"""Manages process-based concurrency"""
def __init__(self, max_workers: int = None):
self.max_workers = max_workers or multiprocessing.cpu_count()
self.executor = ProcessPoolExecutor(max_workers=self.max_workers)
async def process_task(self, task_func, *args):
loop = asyncio.get_event_loop()
return await loop.run_in_executor(self.executor, task_func, *args)
async def process_batch(self, tasks: List[tuple]) -> List:
coroutines = [
self.process_task(task_func, *args)
for task_func, *args in tasks
]
return await asyncio.gather(*coroutines)
# Thread model: Scale out via threads (for I/O-bound work)
class ThreadManager:
"""Manages thread-based concurrency"""
def __init__(self, max_workers: int = 50):
self.max_workers = max_workers
self.semaphore = asyncio.Semaphore(max_workers)
async def limited_task(self, coro):
async with self.semaphore:
return await coro
# Horizontal scaling calculation
# For CPU-bound work:
# Optimal processes = CPU cores ร (1 + I/O wait time / CPU time)
#
# For I/O-bound work:
# Optimal threads = (Total latency per request ร Target RPS) / 1000
#
# Example:
# Latency = 100ms, Target RPS = 1000
# Threads needed = (100 ร 1000) / 1000 = 100 threads per instance
Factor 9 - Disposability
# Fast startup and graceful shutdown
import signal
import sys
from contextlib import asynccontextmanager
from fastapi import FastAPI
import asyncio
shutdown_event = asyncio.Event()
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
print("Starting up...")
# Initialize connections, load configs, etc.
yield
# Shutdown
print("Shutting down gracefully...")
shutdown_event.set()
# Close connections, flush buffers, complete in-flight requests
app = FastAPI(lifespan=lifespan)
def signal_handler(signum, frame):
print(f"Received signal {signum}, initiating graceful shutdown...")
shutdown_event.set()
# Register signal handlers
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
@app.get("/health")
async def health():
return {"status": "healthy"}
@app.get("/ready")
async def readiness():
# Check if all dependencies are ready
return {"ready": not shutdown_event.is_set()}
โน๏ธDisposability
Processes should be disposable: start up quickly and shut down gracefully. This enables rapid deployment, scaling, and recovery.
Factor 10 - Dev/Prod Parity
# Infrastructure as Code ensures parity
module "dev_environment" {
source = "./modules/environment"
environment = "dev"
instance_type = "t3.medium"
desired_count = 2
database_instance = "db.t3.micro"
enable_monitoring = true
}
module "staging_environment" {
source = "./modules/environment"
environment = "staging"
instance_type = "t3.large"
desired_count = 4
database_instance = "db.t3.small"
enable_monitoring = true
}
module "production_environment" {
source = "./modules/environment"
environment = "production"
instance_type = "c5.xlarge"
desired_count = 10
database_instance = "db.r5.large"
enable_monitoring = true
}
# Feature flags for parity
from enum import Enum
from typing import Dict, Any
import json
class FeatureFlag:
"""Feature flags for dev/prod parity"""
def __init__(self, name: str, enabled: bool = False, config: Dict[str, Any] = None):
self.name = name
self.enabled = enabled
self.config = config or {}
def is_enabled(self) -> bool:
return self.enabled
def get_config(self) -> Dict[str, Any]:
return self.config
class FeatureManager:
"""Manages feature flags across environments"""
def __init__(self):
self.flags: Dict[str, FeatureFlag] = {}
def register(self, flag: FeatureFlag):
self.flags[flag.name] = flag
def is_enabled(self, flag_name: str) -> bool:
flag = self.flags.get(flag_name)
return flag.is_enabled() if flag else False
# Example usage
feature_manager = FeatureManager()
feature_manager.register(FeatureFlag("new_checkout_flow", enabled=True))
feature_manager.register(FeatureFlag("advanced_analytics", enabled=True, config={"sample_rate": 0.1}))
Factor 11 - Logs
# Structured logging as event streams
import logging
import json
from datetime import datetime
from typing import Any, Dict
import uuid
class StructuredLogger:
"""Structured logging for cloud-native applications"""
def __init__(self, service_name: str):
self.service_name = service_name
self.logger = logging.getLogger(service_name)
def log_event(self, event_type: str, data: Dict[str, Any], level: str = "INFO"):
log_entry = {
"timestamp": datetime.utcnow().isoformat(),
"service": self.service_name,
"event_type": event_type,
"level": level,
"trace_id": str(uuid.uuid4()),
"data": data
}
self.logger.info(json.dumps(log_entry))
def log_request(self, request_id: str, method: str, path: str, status_code: int, duration_ms: float):
self.log_event("http_request", {
"request_id": request_id,
"method": method,
"path": path,
"status_code": status_code,
"duration_ms": duration_ms
})
def log_error(self, error: Exception, context: Dict[str, Any] = None):
self.log_event("error", {
"error_type": type(error).__name__,
"error_message": str(error),
"context": context or {}
}, level="ERROR")
# Usage
logger = StructuredLogger("cloud-native-app")
logger.log_request("req-123", "GET", "/api/users", 200, 45.2)
Factor 12 - Admin Processes
# One-off administrative tasks
import asyncio
from typing import List, Dict, Any
class AdminProcess:
"""Handles one-off administrative tasks"""
@staticmethod
async def migrate_database():
"""Run database migrations"""
print("Running database migrations...")
# Implement migration logic
pass
@staticmethod
async def seed_data():
"""Seed initial data"""
print("Seeding initial data...")
# Implement seeding logic
pass
@staticmethod
async def cleanup_old_data():
"""Clean up old data"""
print("Cleaning up old data...")
# Implement cleanup logic
pass
# Run admin tasks
async def run_admin_task(task_name: str):
tasks = {
"migrate": AdminProcess.migrate_database,
"seed": AdminProcess.seed_data,
"cleanup": AdminProcess.cleanup_old_data
}
if task_name in tasks:
await tasks[task_name]()
else:
print(f"Unknown task: {task_name}")
# Execute via CLI or container
if __name__ == "__main__":
import sys
if len(sys.argv) > 1:
asyncio.run(run_admin_task(sys.argv[1]))
โ 12-Factor Summary
Following the 12-Factor App methodology ensures your application is portable, scalable, and resilient. Each factor addresses a specific aspect of cloud-native development.
Summary Table
| Factor | Description | Implementation |
|---|---|---|
| I. Codebase | One codebase in version control | Git + CodeCommit |
| II. Dependencies | Explicitly declare dependencies | requirements.txt |
| III. Config | Store config in environment | Parameter Store |
| IV. Backing Services | Treat as attached resources | Service abstraction |
| V. Build/Release/Run | Separate build and run stages | CI/CD pipeline |
| VI. Processes | Stateless processes | Redis for state |
| VII. Port Binding | Export services via port binding | uvicorn on PORT |
| VIII. Concurrency | Scale via process model | Horizontal scaling |
| IX. Disposability | Fast startup, graceful shutdown | Signal handlers |
| X. Dev/Prod Parity | Keep environments similar | Terraform modules |
| XI. Logs | Treat logs as event streams | Structured logging |
| XII. Admin | Run as one-off processes | Management scripts |