API Rate Limiting - Throttling, Quotas, Redis-Based

API DevelopmentAPI Rate LimitingFree Lesson

Advertisement

Introduction

Rate limiting protects APIs from abuse and ensures fair resource allocation. This tutorial covers various rate limiting strategies including token bucket, sliding window, and Redis-based implementations.

In-Memory Rate Limiter

from time import time
from collections import defaultdict

class RateLimiter:
    def __init__(self, max_requests: int, window_seconds: int):
        self.max_requests = max_requests
        self.window = window_seconds
        self.requests = defaultdict(list)
    
    def is_allowed(self, key: str) -> bool:
        now = time()
        window_start = now - self.window
        
        # Remove old requests
        self.requests[key] = [
            t for t in self.requests[key] if t > window_start
        ]
        
        if len(self.requests[key]) >= self.max_requests:
            return False
        
        self.requests[key].append(now)
        return True
    
    def get_remaining(self, key: str) -> int:
        now = time()
        window_start = now - self.window
        
        current_count = len([
            t for t in self.requests[key] if t > window_start
        ])
        
        return max(0, self.max_requests - current_count)

limiter = RateLimiter(max_requests=100, window_seconds=60)

# Usage
if limiter.is_allowed("user123"):
    # Process request
    pass
else:
    # Return 429 Too Many Requests
    pass

Token Bucket Algorithm

import threading

class TokenBucket:
    def __init__(self, capacity: int, refill_rate: float):
        self.capacity = capacity
        self.tokens = capacity
        self.refill_rate = refill_rate
        self.last_refill = time()
        self.lock = threading.Lock()
    
    def consume(self, tokens: int = 1) -> bool:
        with self.lock:
            self._refill()
            
            if self.tokens >= tokens:
                self.tokens -= tokens
                return True
            
            return False
    
    def _refill(self):
        now = time()
        elapsed = now - self.last_refill
        new_tokens = elapsed * self.refill_rate
        
        self.tokens = min(self.capacity, self.tokens + new_tokens)
        self.last_refill = now
    
    def get_tokens(self) -> float:
        with self.lock:
            self._refill()
            return self.tokens

# Usage
bucket = TokenBucket(capacity=100, refill_rate=1.0)  # 1 token per second

if bucket.consume():
    # Process request
    pass

Redis-Based Rate Limiter

import redis
from time import time

class RedisRateLimiter:
    def __init__(self, redis_url: str = "redis://localhost:6379"):
        self.redis = redis.from_url(redis_url)
    
    def is_allowed(self, key: str, max_requests: int, window: int) -> bool:
        now = time()
        window_start = now - window
        
        pipe = self.redis.pipeline()
        
        # Remove old entries
        pipe.zremrangebyscore(key, 0, window_start)
        
        # Count current requests
        pipe.zcard(key)
        
        # Add new request
        pipe.zadd(key, {str(now): now})
        
        # Set expiry on the key
        pipe.expire(key, window)
        
        results = pipe.execute()
        current_count = results[1]
        
        if current_count > max_requests:
            # Remove the just-added request
            self.redis.zrem(key, str(now))
            return False
        
        return True
    
    def get_window(self, key: str) -> dict:
        now = time()
        count = self.redis.zcard(key)
        ttl = self.redis.ttl(key)
        
        return {
            "requests": count,
            "remaining": max(0, 100 - count),
            "reset_in": ttl
        }

# Usage
limiter = RedisRateLimiter()

@app.middleware("http")
async def rate_limit_middleware(request, call_next):
    client_ip = request.client.host
    
    if not limiter.is_allowed(f"ip:{client_ip}", 100, 60):
        return JSONResponse(
            {"error": "Rate limit exceeded"},
            status_code=429,
            headers={"Retry-After": "60"}
        )
    
    return await call_next(request)

FastAPI Rate Limiter

from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import JSONResponse

# Using slowapi
from slowapi import Limiter
from slowapi.util import get_remote_address

limiter = Limiter(key_func=get_remote_address)
app = FastAPI()

@app.get("/items")
@limiter.limit("100/minute")
async def list_items(request: Request):
    return {"items": ["item1", "item2"]}

@app.get("/users")
@limiter.limit("10/second")
async def list_users(request: Request):
    return {"users": ["user1", "user2"]}

Practice Problems

  1. Implement a distributed rate limiter using Redis
  2. Add rate limiting per API endpoint
  3. Create rate limiting with burst allowance
  4. Implement rate limit headers (X-RateLimit-Limit, etc.)
  5. Add rate limiting with different tiers (free, premium)

Advertisement

Need Expert Python Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement