API Design with FastAPI/Flask
Difficulty: Medium-Hard | Companies: Google, Meta, Amazon, Netflix, Stripe
FastAPI Fundamentals
from fastapi import FastAPI, HTTPException, Depends, status, Query, Path
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from pydantic import BaseModel, EmailStr, Field
from typing import Optional, List
from datetime import datetime
from enum import Enum
import jwt
app = FastAPI(title="User API", version="1.0.0")
# Data Models
class UserRole(str, Enum):
ADMIN = "admin"
USER = "user"
MODERATOR = "moderator"
class UserBase(BaseModel):
"""Base user schema."""
email: EmailStr
name: str = Field(..., min_length=2, max_length=50)
role: UserRole = UserRole.USER
class UserCreate(UserBase):
"""Schema for user creation."""
password: str = Field(..., min_length=8)
class UserResponse(UserBase):
"""Schema for user response."""
id: int
created_at: datetime
is_active: bool = True
class Config:
from_attributes = True
class UserUpdate(BaseModel):
"""Schema for user update."""
name: Optional[str] = None
email: Optional[EmailStr] = None
role: Optional[UserRole] = None
class PaginatedResponse(BaseModel):
"""Generic paginated response."""
items: List[UserResponse]
total: int
page: int
per_page: int
pages: int
# Dependency Injection
security = HTTPBearer()
async def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security)
) -> dict:
"""Get current authenticated user from JWT token."""
try:
payload = jwt.decode(
credentials.credentials,
"SECRET_KEY",
algorithms=["HS256"]
)
user_id = payload.get("sub")
if user_id is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token"
)
return {"id": user_id, "role": payload.get("role")}
except jwt.ExpiredSignatureError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token expired"
)
def require_role(roles: List[UserRole]):
"""Dependency to require specific roles."""
async def role_checker(current_user: dict = Depends(get_current_user)):
if current_user["role"] not in [r.value for r in roles]:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Insufficient permissions"
)
return current_user
return role_checker
# API Endpoints
@app.post("/users/", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
async def create_user(user: UserCreate):
"""Create a new user."""
# Simulate database creation
return UserResponse(
id=1,
email=user.email,
name=user.name,
role=user.role,
created_at=datetime.now()
)
@app.get("/users/", response_model=PaginatedResponse)
async def list_users(
page: int = Query(1, ge=1),
per_page: int = Query(10, ge=1, le=100),
search: Optional[str] = None,
role: Optional[UserRole] = None,
current_user: dict = Depends(get_current_user)
):
"""List users with pagination and filtering."""
# Simulate database query
users = [
UserResponse(
id=i,
email=f"user{i}@example.com",
name=f"User {i}",
role=UserRole.USER,
created_at=datetime.now()
)
for i in range(1, 11)
]
return PaginatedResponse(
items=users,
total=100,
page=page,
per_page=per_page,
pages=10
)
@app.get("/users/{user_id}", response_model=UserResponse)
async def get_user(user_id: int = Path(..., gt=0)):
"""Get user by ID."""
if user_id > 100:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
return UserResponse(
id=user_id,
email=f"user{user_id}@example.com",
name=f"User {user_id}",
role=UserRole.USER,
created_at=datetime.now()
)
@app.put("/users/{user_id}", response_model=UserResponse)
async def update_user(
user_id: int,
user_update: UserUpdate,
current_user: dict = Depends(require_role([UserRole.ADMIN]))
):
"""Update user (admin only)."""
return UserResponse(
id=user_id,
email=user_update.email or f"user{user_id}@example.com",
name=user_update.name or f"User {user_id}",
role=user_update.role or UserRole.USER,
created_at=datetime.now()
)
@app.delete("/users/{user_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_user(
user_id: int,
current_user: dict = Depends(require_role([UserRole.ADMIN]))
):
"""Delete user (admin only)."""
pass
βΉοΈ
FastAPI automatically generates OpenAPI documentation. Use response models and status codes for clear API contracts.
Flask API Patterns
from flask import Flask, request, jsonify, abort, g
from functools import wraps
from typing import Callable, Any
import jwt
from datetime import datetime, timedelta
from dataclasses import dataclass
from marshmallow import Schema, fields, validate, ValidationError
app = Flask(__name__)
app.config['SECRET_KEY'] = 'your-secret-key'
# Schema Validation
class UserSchema(Schema):
"""Marshmallow schema for user validation."""
id = fields.Int(dump_only=True)
email = fields.Email(required=True)
name = fields.Str(required=True, validate=validate.Length(min=2, max=50))
role = fields.Str(validate=validate.OneOf(['admin', 'user', 'moderator']))
created_at = fields.DateTime(dump_only=True)
user_schema = UserSchema()
users_schema = UserSchema(many=True)
# Custom Decorators
def require_auth(f):
"""Decorator to require authentication."""
@wraps(f)
def decorated(*args, **kwargs):
token = request.headers.get('Authorization', '').replace('Bearer ', '')
if not token:
return jsonify({'error': 'Token missing'}), 401
try:
payload = jwt.decode(token, app.config['SECRET_KEY'], algorithms=['HS256'])
g.current_user = payload
except jwt.ExpiredSignatureError:
return jsonify({'error': 'Token expired'}), 401
except jwt.InvalidTokenError:
return jsonify({'error': 'Invalid token'}), 401
return f(*args, **kwargs)
return decorated
def require_role(roles):
"""Decorator to require specific roles."""
def decorator(f):
@wraps(f)
def decorated(*args, **kwargs):
if g.current_user.get('role') not in roles:
return jsonify({'error': 'Insufficient permissions'}), 403
return f(*args, **kwargs)
return decorated
return decorator
def validate_request(schema):
"""Decorator to validate request data."""
def decorator(f):
@wraps(f)
def decorated(*args, **kwargs):
try:
data = schema.load(request.get_json())
g.validated_data = data
except ValidationError as err:
return jsonify({'errors': err.messages}), 400
return f(*args, **kwargs)
return decorated
return decorator
# Error Handlers
@app.errorhandler(404)
def not_found(error):
return jsonify({'error': 'Not found'}), 404
@app.errorhandler(500)
def internal_error(error):
return jsonify({'error': 'Internal server error'}), 500
# API Endpoints
@app.route('/api/users', methods=['GET'])
@require_auth
def get_users():
"""Get all users with pagination."""
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 10, type=int)
# Simulate database query
users = [
{'id': i, 'email': f'user{i}@example.com', 'name': f'User {i}'}
for i in range(1, 11)
]
return jsonify({
'users': users,
'page': page,
'per_page': per_page,
'total': 100
})
@app.route('/api/users', methods=['POST'])
@validate_request(UserSchema())
def create_user():
"""Create a new user."""
user_data = g.validated_data
# Simulate database creation
user = {
'id': 1,
**user_data,
'created_at': datetime.now().isoformat()
}
return jsonify(user), 201
@app.route('/api/users/<int:user_id>', methods=['PUT'])
@require_auth
@require_role(['admin'])
@validate_request(UserSchema(partial=True))
def update_user(user_id):
"""Update user (admin only)."""
user_data = g.validated_data
# Simulate database update
user = {
'id': user_id,
**user_data,
'updated_at': datetime.now().isoformat()
}
return jsonify(user)
@app.route('/api/users/<int:user_id>', methods=['DELETE'])
@require_auth
@require_role(['admin'])
def delete_user(user_id):
"""Delete user (admin only)."""
# Simulate database deletion
return '', 204
# Authentication Endpoints
@app.route('/api/auth/login', methods=['POST'])
def login():
"""Authenticate user and return JWT."""
data = request.get_json()
email = data.get('email')
password = data.get('password')
# Simulate authentication
if email == 'user@example.com' and password == 'password':
token = jwt.encode({
'sub': 1,
'email': email,
'role': 'user',
'exp': datetime.utcnow() + timedelta(hours=24)
}, app.config['SECRET_KEY'], algorithm='HS256')
return jsonify({'token': token})
return jsonify({'error': 'Invalid credentials'}), 401
API Versioning
from fastapi import FastAPI, APIRouter
from typing import Union
# Version 1
router_v1 = APIRouter(prefix="/api/v1")
@router_v1.get("/users/{user_id}")
async def get_user_v1(user_id: int):
"""V1: Returns basic user data."""
return {"id": user_id, "name": f"User {user_id}"}
# Version 2
router_v2 = APIRouter(prefix="/api/v2")
@router_v2.get("/users/{user_id}")
async def get_user_v2(user_id: int):
"""V2: Returns enriched user data."""
return {
"id": user_id,
"name": f"User {user_id}",
"profile": {"avatar": "default.png", "bio": "Hello"},
"metadata": {"created_at": "2024-01-01"}
}
app = FastAPI()
app.include_router(router_v1)
app.include_router(router_v2)
β οΈ
API versioning is crucial for backward compatibility. Use URL path versioning (/api/v1/) for clarity.
Rate Limiting and Caching
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from functools import wraps
import time
from typing import Dict, Callable
import hashlib
# Rate Limiter Middleware
class RateLimiter:
"""Token bucket rate limiter."""
def __init__(self, requests_per_minute: int = 60):
self.requests_per_minute = requests_per_minute
self.requests = {}
def __call__(self, request: Request):
client_ip = request.client.host
now = time.time()
# Clean old requests
if client_ip in self.requests:
self.requests[client_ip] = [
req_time for req_time in self.requests[client_ip]
if now - req_time < 60
]
else:
self.requests[client_ip] = []
# Check rate limit
if len(self.requests[client_ip]) >= self.requests_per_minute:
return False
self.requests[client_ip].append(now)
return True
# Cache Decorator
def cache_response(ttl: int = 300):
"""Cache API response."""
def decorator(func: Callable):
cache = {}
@wraps(func)
async def wrapper(*args, **kwargs):
# Create cache key
key = hashlib.md5(
f"{func.__name__}:{args}:{kwargs}".encode()
).hexdigest()
# Check cache
if key in cache:
cached_time, cached_result = cache[key]
if time.time() - cached_time < ttl:
return cached_result
# Execute and cache
result = await func(*args, **kwargs)
cache[key] = (time.time(), result)
return result
return wrapper
return decorator
# Usage
app = FastAPI()
# CORS Middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["http://localhost:3000"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.middleware("http")
async def add_process_time_header(request: Request, call_next):
"""Add processing time header."""
start_time = time.time()
response = await call_next(request)
process_time = time.time() - start_time
response.headers["X-Process-Time"] = str(process_time)
return response
@app.get("/api/data")
@cache_response(ttl=60)
async def get_data():
"""Cached endpoint."""
# Expensive computation
return {"data": "expensive result"}
Follow-Up Questions
-
Explain the differences between REST and GraphQL.
-
How do you handle API authentication and authorization?
-
What are the best practices for API error handling?
-
How do you design APIs for mobile clients with limited bandwidth?
-
Explain the HATEOAS principle in REST APIs.