๐ŸŽ‰ 75% of content is free forever โ€” Unlock Premium from $10/mo โ†’
CW
Search coursesโ€ฆ
๐Ÿ’ผ Servicesโ„น๏ธ Aboutโœ‰๏ธ ContactView Pricing Plansfrom $10

API Gateway Design: Rate Limiting, Auth, Caching

Cloud ArchitectureAPI Gateway Designโญ Premium

Advertisement

API Gateway Design: Rate Limiting, Auth, Caching

Difficulty: Senior Level | Companies: Netflix, Stripe, Twilio, AWS, Google Cloud

Interview Question

"Design an API Gateway handling 100,000 RPS with rate limiting, authentication, request/response transformation, and caching. How do you handle hotspots and ensure sub-10ms latency?"

โ„น๏ธKey Concepts

This question tests your understanding of API management, performance optimization, and scalable gateway patterns.

Complete API Gateway Architecture

Architecture Overview

Architecture Diagram
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                    API GATEWAY ARCHITECTURE                              โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚                                                                          โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ CLIENT LAYER โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”               โ”‚
โ”‚  โ”‚  Web Apps โ”‚ Mobile Apps โ”‚ IoT Devices โ”‚ Partners    โ”‚               โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜               โ”‚
โ”‚                         โ”‚                                               โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ EDGE LAYER โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”               โ”‚
โ”‚  โ”‚  CloudFront โ”‚ WAF โ”‚ Shield โ”‚ DDoS Protection       โ”‚               โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜               โ”‚
โ”‚                         โ”‚                                               โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ GATEWAY LAYER โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”               โ”‚
โ”‚  โ”‚                                                       โ”‚              โ”‚
โ”‚  โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”‚              โ”‚
โ”‚  โ”‚  โ”‚              API Gateway                      โ”‚    โ”‚              โ”‚
โ”‚  โ”‚  โ”‚                                               โ”‚    โ”‚              โ”‚
โ”‚  โ”‚  โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”‚    โ”‚              โ”‚
โ”‚  โ”‚  โ”‚  โ”‚  Auth    โ”‚  โ”‚  Rate    โ”‚  โ”‚  Cache   โ”‚  โ”‚    โ”‚              โ”‚
โ”‚  โ”‚  โ”‚  โ”‚  Layer   โ”‚  โ”‚  Limiter โ”‚  โ”‚  Layer   โ”‚  โ”‚    โ”‚              โ”‚
โ”‚  โ”‚  โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ”‚    โ”‚              โ”‚
โ”‚  โ”‚  โ”‚                                               โ”‚    โ”‚              โ”‚
โ”‚  โ”‚  โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”‚    โ”‚              โ”‚
โ”‚  โ”‚  โ”‚  โ”‚  Request โ”‚  โ”‚  Responseโ”‚  โ”‚  Logging โ”‚  โ”‚    โ”‚              โ”‚
โ”‚  โ”‚  โ”‚  โ”‚ Transformโ”‚  โ”‚ Transformโ”‚  โ”‚  & Audit โ”‚  โ”‚    โ”‚              โ”‚
โ”‚  โ”‚  โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ”‚    โ”‚              โ”‚
โ”‚  โ”‚  โ”‚                                               โ”‚    โ”‚              โ”‚
โ”‚  โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜    โ”‚              โ”‚
โ”‚  โ”‚                                                       โ”‚              โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜               โ”‚
โ”‚                         โ”‚                                               โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ SERVICE LAYER โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”                โ”‚
โ”‚  โ”‚  Microservices โ”‚ Lambda Functions โ”‚ External APIs  โ”‚                โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜              โ”‚
โ”‚                                                                          โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Mathematical Foundation: Rate Limiting

Token Bucket Algorithm:

  • Bucket capacity: C tokens
  • Refill rate: R tokens/second
  • Time interval: T seconds
  • Tokens available: B(t) = min(C, B(t-1) + R ร— T)
  • Request allowed if: B(t) โ‰ฅ 1

Sliding Window Algorithm:

  • Window size: W seconds
  • Max requests: M
  • Current window count: N(t) = sum(requests in [t-W, t])
  • Request allowed if: N(t) < M

Rate Limiting for 100K RPS:

  • Total capacity: 100,000 requests/second
  • Per-user limit: 100 requests/second
  • Concurrent users: 1,000
  • Buffer size: 10% = 10,000 requests

API Gateway Implementation

AWS API Gateway Configuration

# API Gateway REST API
resource "aws_api_gateway_rest_api" "main" {
  name        = "main-api-gateway"
  description = "Main API Gateway for production services"

  endpoint_configuration {
    types = ["REGIONAL"]
  }

  policy = data.aws_iam_policy_document.api_gateway_policy.json
}

# Resource and methods
resource "aws_api_gateway_resource" "users" {
  rest_api_id = aws_api_gateway_rest_api.main.id
  parent_id   = aws_api_gateway_rest_api.main.root_resource_id
  path_part   = "users"
}

resource "aws_api_gateway_method" "get_users" {
  rest_api_id   = aws_api_gateway_rest_api.main.id
  resource_id   = aws_api_gateway_resource.users.id
  http_method   = "GET"
  authorization = "COGNITO_USER_POOLS"
  authorizer_id = aws_api_gateway_authorizer.cognito.id

  request_validator_id = aws_api_gateway_request_validator.query_params.id
}

# Integration with Lambda
resource "aws_api_gateway_integration" "users_lambda" {
  rest_api_id             = aws_api_gateway_rest_api.main.id
  resource_id             = aws_api_gateway_resource.users.id
  http_method             = aws_api_gateway_method.get_users.http_method
  integration_http_method = "POST"
  type                    = "AWS_PROXY"
  uri                     = aws_lambda_function.users.invoke_arn
}

# Request validator
resource "aws_api_gateway_request_validator" "query_params" {
  rest_api_id                 = aws_api_gateway_rest_api.main.id
  name                        = "validate-query-params"
  validate_request_body       = false
  validate_request_parameters = true
}

# API Key and usage plan for rate limiting
resource "aws_api_gateway_api_key" "client" {
  name    = "client-api-key"
  enabled = true
}

resource "aws_api_gateway_usage_plan" "standard" {
  name        = "standard-usage-plan"
  description = "Standard usage plan with rate limiting"

  api_stages {
    api_id = aws_api_gateway_rest_api.main.id
    stage  = aws_api_gateway_stage.production.stage_name
  }

  throttle_settings {
    burst_limit = 2000
    rate_limit  = 1000
  }

  quota_settings {
    limit  = 1000000
    period = "DAY"
  }
}

resource "aws_api_gateway_usage_plan_key" "client" {
  key_id        = aws_api_gateway_api_key.client.id
  key_type      = "API_KEY"
  usage_plan_id = aws_api_gateway_usage_plan.standard.id
}

# Custom domain
resource "aws_api_gateway_domain_name" "main" {
  domain_name     = "api.example.com"
  certificate_arn = aws_acm_certificate.api.arn

  endpoint_configuration {
    types = ["REGIONAL"]
  }
}

resource "aws_api_gateway_base_path_mapping" "main" {
  domain_name = aws_api_gateway_domain_name.main.domain_name
  api_id      = aws_api_gateway_rest_api.main.id
  stage_name  = aws_api_gateway_stage.production.stage_name
}

Rate Limiting Implementation

# Advanced rate limiting with multiple strategies
import time
from typing import Dict, Optional
from dataclasses import dataclass
from collections import defaultdict
import redis
from enum import Enum

class RateLimitStrategy(Enum):
    TOKEN_BUCKET = "token_bucket"
    SLIDING_WINDOW = "sliding_window"
    FIXED_WINDOW = "fixed_window"
    LEAKY_BUCKET = "leaky_bucket"

@dataclass
class RateLimitConfig:
    requests_per_second: int
    burst_capacity: int
    strategy: RateLimitStrategy = RateLimitStrategy.TOKEN_BUCKET
    window_size: int = 60  # seconds

class RateLimiter:
    """Multi-strategy rate limiter"""

    def __init__(self, redis_client: redis.Redis, config: RateLimitConfig):
        self.redis = redis_client
        self.config = config

    def is_allowed(self, client_id: str) -> bool:
        """Check if request is allowed"""
        if self.config.strategy == RateLimitStrategy.TOKEN_BUCKET:
            return self._token_bucket(client_id)
        elif self.config.strategy == RateLimitStrategy.SLIDING_WINDOW:
            return self._sliding_window(client_id)
        elif self.config.strategy == RateLimitStrategy.FIXED_WINDOW:
            return self._fixed_window(client_id)
        elif self.config.strategy == RateLimitStrategy.LEAKY_BUCKET:
            return self._leaky_bucket(client_id)
        return False

    def _token_bucket(self, client_id: str) -> bool:
        """Token bucket algorithm"""
        key = f"rate_limit:token_bucket:{client_id}"
        now = time.time()

        # Get current bucket state
        bucket = self.redis.hgetall(key)

        if not bucket:
            # Initialize bucket
            self.redis.hset(key, mapping={
                'tokens': self.config.burst_capacity,
                'last_refill': now
            })
            self.redis.expire(key, 60)
            tokens = self.config.burst_capacity
        else:
            tokens = float(bucket[b'tokens'])
            last_refill = float(bucket[b'last_refill'])

            # Refill tokens
            elapsed = now - last_refill
            refill_amount = elapsed * self.config.requests_per_second
            tokens = min(self.config.burst_capacity, tokens + refill_amount)

        if tokens >= 1:
            # Consume token
            self.redis.hset(key, mapping={
                'tokens': tokens - 1,
                'last_refill': now
            })
            return True
        return False

    def _sliding_window(self, client_id: str) -> bool:
        """Sliding window algorithm"""
        key = f"rate_limit:sliding_window:{client_id}"
        now = time.time()
        window_start = now - self.config.window_size

        # Use Redis sorted set
        pipe = self.redis.pipeline()
        pipe.zremrangebyscore(key, 0, window_start)
        pipe.zcard(key)
        pipe.zadd(key, {str(now): now})
        pipe.expire(key, self.config.window_size)
        results = pipe.execute()

        current_count = results[1]
        return current_count < self.config.requests_per_second * self.config.window_size

    def _fixed_window(self, client_id: str) -> bool:
        """Fixed window algorithm"""
        current_window = int(time.time() / self.config.window_size)
        key = f"rate_limit:fixed_window:{client_id}:{current_window}"

        current_count = self.redis.incr(key)
        if current_count == 1:
            self.redis.expire(key, self.config.window_size)

        return current_count <= self.config.requests_per_second * self.config.window_size

    def _leaky_bucket(self, client_id: str) -> bool:
        """Leaky bucket algorithm"""
        key = f"rate_limit:leaky_bucket:{client_id}"
        now = time.time()

        bucket = self.redis.hgetall(key)

        if not bucket:
            self.redis.hset(key, mapping={
                'water': 0,
                'last_leak': now
            })
            self.redis.expire(key, 60)
            water = 0
        else:
            water = float(bucket[b'water'])
            last_leak = float(bucket[b'last_leak'])

            # Leak water
            elapsed = now - last_leak
            leaked = elapsed * self.config.requests_per_second
            water = max(0, water - leaked)

        if water < self.config.burst_capacity:
            self.redis.hset(key, mapping={
                'water': water + 1,
                'last_leak': now
            })
            return True
        return False

    def get_retry_after(self, client_id: str) -> Optional[int]:
        """Get seconds until next request is allowed"""
        if self.config.strategy == RateLimitStrategy.TOKEN_BUCKET:
            key = f"rate_limit:token_bucket:{client_id}"
            bucket = self.redis.hgetall(key)

            if bucket:
                tokens = float(bucket[b'tokens'])
                if tokens < 1:
                    return int((1 - tokens) / self.config.requests_per_second)
        return None

โš ๏ธRate Limiting

Use distributed rate limiting with Redis for multi-instance deployments. Consider different limits for different API endpoints and user tiers.

Authentication & Authorization

# JWT authentication middleware
import jwt
from datetime import datetime, timedelta
from typing import Dict, Any, Optional
from dataclasses import dataclass
import hashlib
import hmac

@dataclass
class JWTConfig:
    secret_key: str
    algorithm: str = "HS256"
    access_token_expire_minutes: int = 30
    refresh_token_expire_days: int = 7

class JWTAuthenticator:
    """JWT authentication handler"""

    def __init__(self, config: JWTConfig):
        self.config = config

    def create_access_token(self, data: Dict[str, Any]) -> str:
        """Create JWT access token"""
        to_encode = data.copy()
        expire = datetime.utcnow() + timedelta(
            minutes=self.config.access_token_expire_minutes
        )
        to_encode.update({
            "exp": expire,
            "iat": datetime.utcnow(),
            "type": "access"
        })
        return jwt.encode(
            to_encode,
            self.config.secret_key,
            algorithm=self.config.algorithm
        )

    def create_refresh_token(self, data: Dict[str, Any]) -> str:
        """Create JWT refresh token"""
        to_encode = data.copy()
        expire = datetime.utcnow() + timedelta(
            days=self.config.refresh_token_expire_days
        )
        to_encode.update({
            "exp": expire,
            "iat": datetime.utcnow(),
            "type": "refresh"
        })
        return jwt.encode(
            to_encode,
            self.config.secret_key,
            algorithm=self.config.algorithm
        )

    def verify_token(self, token: str) -> Optional[Dict[str, Any]]:
        """Verify JWT token"""
        try:
            payload = jwt.decode(
                token,
                self.config.secret_key,
                algorithms=[self.config.algorithm]
            )
            return payload
        except jwt.ExpiredSignatureError:
            return None
        except jwt.InvalidTokenError:
            return None

class APIKeyManager:
    """API key management"""

    def __init__(self):
        self.keys: Dict[str, Dict[str, Any]] = {}

    def generate_api_key(self, client_id: str, permissions: list) -> str:
        """Generate new API key"""
        api_key = hashlib.sha256(
            f"{client_id}{datetime.utcnow().isoformat()}".encode()
        ).hexdigest()

        self.keys[api_key] = {
            'client_id': client_id,
            'permissions': permissions,
            'created_at': datetime.utcnow(),
            'active': True
        }

        return api_key

    def validate_api_key(self, api_key: str) -> Optional[Dict[str, Any]]:
        """Validate API key"""
        key_data = self.keys.get(api_key)
        if key_data and key_data['active']:
            return key_data
        return None

    def revoke_api_key(self, api_key: str) -> bool:
        """Revoke API key"""
        if api_key in self.keys:
            self.keys[api_key]['active'] = False
            return True
        return False

Request/Response Transformation

# Request and response transformation
from typing import Dict, Any, List, Callable
from dataclasses import dataclass
import json
import re

@dataclass
class TransformRule:
    source_field: str
    target_field: str
    transform: Callable = None
    required: bool = False

class RequestTransformer:
    """Transform API requests"""

    def __init__(self):
        self.rules: Dict[str, List[TransformRule]] = {}

    def add_rule(self, endpoint: str, rule: TransformRule):
        if endpoint not in self.rules:
            self.rules[endpoint] = []
        self.rules[endpoint].append(rule)

    def transform(self, endpoint: str, request_data: Dict[str, Any]) -> Dict[str, Any]:
        """Apply transformation rules"""
        rules = self.rules.get(endpoint, [])
        transformed = {}

        for rule in rules:
            value = request_data.get(rule.source_field)

            if rule.required and value is None:
                raise ValueError(f"Required field missing: {rule.source_field}")

            if value is not None:
                if rule.transform:
                    value = rule.transform(value)
                transformed[rule.target_field] = value

        return transformed

class ResponseTransformer:
    """Transform API responses"""

    def __init__(self):
        self.rules: Dict[str, List[TransformRule]] = {}

    def add_rule(self, endpoint: str, rule: TransformRule):
        if endpoint not in self.rules:
            self.rules[endpoint] = []
        self.rules[endpoint].append(rule)

    def transform(self, endpoint: str, response_data: Dict[str, Any]) -> Dict[str, Any]:
        """Apply transformation rules"""
        rules = self.rules.get(endpoint, [])
        transformed = {}

        for rule in rules:
            value = self._get_nested_value(response_data, rule.source_field)

            if value is not None:
                if rule.transform:
                    value = rule.transform(value)
                transformed[rule.target_field] = value

        return transformed

    def _get_nested_value(self, data: Dict[str, Any], field_path: str) -> Any:
        """Get value from nested dictionary"""
        fields = field_path.split('.')
        current = data

        for field in fields:
            if isinstance(current, dict):
                current = current.get(field)
            else:
                return None

        return current

# Example transformations
def transform_user_request(data: Dict[str, Any]) -> Dict[str, Any]:
    """Transform user request"""
    transformer = RequestTransformer()

    # Transform camelCase to snake_case
    transformer.add_rule('/users', TransformRule(
        source_field='firstName',
        target_field='first_name',
        required=True
    ))

    transformer.add_rule('/users', TransformRule(
        source_field='lastName',
        target_field='last_name',
        required=True
    ))

    transformer.add_rule('/users', TransformRule(
        source_field='emailAddress',
        target_field='email',
        transform=lambda x: x.lower()
    ))

    return transformer.transform('/users', data)

def transform_user_response(data: Dict[str, Any]) -> Dict[str, Any]:
    """Transform user response"""
    transformer = ResponseTransformer()

    # Transform snake_case to camelCase
    transformer.add_rule('/users', TransformRule(
        source_field='user_id',
        target_field='userId'
    ))

    transformer.add_rule('/users', TransformRule(
        source_field='first_name',
        target_field='firstName'
    ))

    transformer.add_rule('/users', TransformRule(
        source_field='created_at',
        target_field='createdAt',
        transform=lambda x: x.isoformat() if hasattr(x, 'isoformat') else x
    ))

    return transformer.transform('/users', data)

Response Caching

# Multi-layer caching strategy
import redis
import json
from typing import Dict, Any, Optional
from datetime import datetime, timedelta
from dataclasses import dataclass
import hashlib

@dataclass
class CacheConfig:
    default_ttl: int = 300  # 5 minutes
    max_ttl: int = 3600  # 1 hour
    key_prefix: str = "api_cache:"

class APICache:
    """Multi-layer API cache"""

    def __init__(self, redis_client: redis.Redis, config: CacheConfig):
        self.redis = redis_client
        self.config = config
        self.local_cache: Dict[str, Any] = {}

    def get(self, key: str) -> Optional[Dict[str, Any]]:
        """Get cached response"""
        # Check local cache first
        if key in self.local_cache:
            item = self.local_cache[key]
            if item['expires'] > datetime.utcnow():
                return item['data']
            else:
                del self.local_cache[key]

        # Check Redis
        full_key = f"{self.config.key_prefix}{key}"
        cached = self.redis.get(full_key)

        if cached:
            data = json.loads(cached)
            # Populate local cache
            self.local_cache[key] = {
                'data': data,
                'expires': datetime.utcnow() + timedelta(seconds=60)
            }
            return data

        return None

    def set(self, key: str, data: Dict[str, Any], ttl: int = None):
        """Cache response"""
        if ttl is None:
            ttl = self.config.default_ttl

        # Set in Redis
        full_key = f"{self.config.key_prefix}{key}"
        self.redis.setex(
            full_key,
            ttl,
            json.dumps(data, default=str)
        )

        # Set in local cache
        self.local_cache[key] = {
            'data': data,
            'expires': datetime.utcnow() + timedelta(seconds=min(ttl, 60))
        }

    def invalidate(self, pattern: str):
        """Invalidate cache by pattern"""
        # Clear local cache
        keys_to_delete = [
            key for key in self.local_cache.keys()
            if pattern in key
        ]
        for key in keys_to_delete:
            del self.local_cache[key]

        # Clear Redis cache
        full_pattern = f"{self.config.key_prefix}{pattern}*"
        cursor = 0
        while True:
            cursor, keys = self.redis.scan(
                cursor,
                match=full_pattern,
                count=100
            )
            if keys:
                self.redis.delete(*keys)
            if cursor == 0:
                break

    def generate_cache_key(self, method: str, path: str, params: Dict[str, Any] = None) -> str:
        """Generate cache key from request"""
        key_parts = [method, path]

        if params:
            sorted_params = sorted(params.items())
            key_parts.append(json.dumps(sorted_params))

        key_string = ':'.join(key_parts)
        return hashlib.md5(key_string.encode()).hexdigest()

class CacheMiddleware:
    """Cache middleware for API Gateway"""

    def __init__(self, cache: APICache):
        self.cache = cache
        self.cacheable_methods = {'GET', 'HEAD'}
        self.cacheable_status_codes = {200, 203, 204, 206, 300, 301, 404, 410}

    def should_cache(self, method: str, status_code: int, headers: Dict[str, str]) -> bool:
        """Determine if response should be cached"""
        if method not in self.cacheable_methods:
            return False

        if status_code not in self.cacheable_status_codes:
            return False

        # Check cache control headers
        cache_control = headers.get('Cache-Control', '')
        if 'no-store' in cache_control or 'no-cache' in cache_control:
            return False

        return True

    def get_cache_ttl(self, headers: Dict[str, str]) -> Optional[int]:
        """Get cache TTL from headers"""
        cache_control = headers.get('Cache-Control', '')
        if 'max-age' in cache_control:
            match = re.search(r'max-age=(\d+)', cache_control)
            if match:
                return int(match.group(1))
        return None

โœ…Caching Strategy

Use multi-level caching: L1 (local memory) for hot data, L2 (Redis) for shared data, L3 (CDN) for static content. Implement cache invalidation carefully.

Request Validation

# Request validation with JSON Schema
from typing import Dict, Any, List
from dataclasses import dataclass
import jsonschema
from jsonschema import validate, ValidationError

@dataclass
class ValidationSchema:
    endpoint: str
    method: str
    schema: Dict[str, Any]

class RequestValidator:
    """Request validation with JSON Schema"""

    def __init__(self):
        self.schemas: Dict[str, ValidationSchema] = {}

    def add_schema(self, endpoint: str, method: str, schema: Dict[str, Any]):
        key = f"{method}:{endpoint}"
        self.schemas[key] = ValidationSchema(
            endpoint=endpoint,
            method=method,
            schema=schema
        )

    def validate(self, endpoint: str, method: str, data: Dict[str, Any]) -> List[str]:
        """Validate request data"""
        key = f"{method}:{endpoint}"
        schema = self.schemas.get(key)

        if not schema:
            return []

        errors = []
        try:
            validate(instance=data, schema=schema.schema)
        except ValidationError as e:
            errors.append(f"Validation error: {e.message}")

        return errors

# Example schema
USER_SCHEMA = {
    "type": "object",
    "properties": {
        "firstName": {"type": "string", "minLength": 1, "maxLength": 50},
        "lastName": {"type": "string", "minLength": 1, "maxLength": 50},
        "email": {"type": "string", "format": "email"},
        "age": {"type": "integer", "minimum": 13, "maximum": 120}
    },
    "required": ["firstName", "lastName", "email"],
    "additionalProperties": False
}

# Initialize validator
validator = RequestValidator()
validator.add_schema('/users', 'POST', USER_SCHEMA)

# Validate request
errors = validator.validate('/users', 'POST', {
    'firstName': 'John',
    'lastName': 'Doe',
    'email': 'john@example.com',
    'age': 30
})

Summary

ComponentPurposeImplementation
Rate LimitingRequest throttlingToken bucket, sliding window
AuthenticationIdentity verificationJWT, API keys
AuthorizationPermission checkingRBAC, ABAC
TransformationData mappingRequest/response transforms
CachingResponse cachingMulti-level caching
ValidationRequest validationJSON Schema

Advertisement