Caching at Scale: Redis, CDN, Cache Invalidation Patterns
Difficulty: Senior Level | Companies: Netflix, Twitter, Facebook, Redis Labs, Cloudflare
Interview Question
"Design a caching architecture for a high-traffic application serving 1 million requests per second. How do you handle cache invalidation, consistency, and failure scenarios?"
โน๏ธKey Concepts
This question tests your understanding of caching patterns, distributed caching, and cache invalidation strategies.
Complete Caching Architecture
Architecture Overview
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ MULTI-LAYER CACHING ARCHITECTURE โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ โโโโโโโโโโโโโโโโโโ CLIENT LAYER โโโโโโโโโโโโโโโโโโโโโ โ
โ โ Browser Cache โ Service Worker โ Local Storage โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ โ
โ โโโโโโโโโโโโโโโโโโ CDN LAYER โโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ CloudFront โ Cloudflare โ Akamai โ โ
โ โ Edge locations โ Origin shield โ Regional edge โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ โ
โ โโโโโโโโโโโโโโโโโโ APPLICATION LAYER โโโโโโโโโโโโโโโโ โ
โ โ โ โ
โ โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ โ
โ โ โ In-Memory Cache โ โ โ
โ โ โ (Redis Cluster / Memcached) โ โ โ
โ โ โ โ โ โ
โ โ โ โโโโโโโโโโโโ โโโโโโโโโโโโ โโโโโโโโโโโโ โ โ โ
โ โ โ โ L1 โ โ L2 โ โ L3 โ โ โ โ
โ โ โ โ (Local) โ โ (Redis) โ โ (CDN) โ โ โ โ
โ โ โ โโโโโโโโโโโโ โโโโโโโโโโโโ โโโโโโโโโโโโ โ โ โ
โ โ โ โ โ โ
โ โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ โ
โ โ โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ โ
โ โโโโโโโโโโโโโโโโโโ DATABASE LAYER โโโโโโโโโโโโโโโโโโ โ
โ โ PostgreSQL โ MongoDB โ DynamoDB โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Mathematical Foundation: Cache Metrics
Cache Hit Ratio:
- Hit ratio = hits / (hits + misses)
- For 1M RPS with 99% hit ratio: 10K misses/second
- For 1M RPS with 95% hit ratio: 50K misses/second
Cache Latency:
- L1 (local memory): 0.1ms
- L2 (Redis): 1ms
- L3 (CDN): 10ms
- Database: 100ms
Cache Size Calculation:
- Total items: N = 1,000,000
- Average item size: S = 1KB
- Total cache size: T = N ร S = 1GB
- With 20% overhead: T_overhead = 1.2GB
Eviction Rate:
- Cache size: C = 1GB
- Item size: S = 1KB
- Max items: M = C / S = 1,000,000
- With LRU eviction: Eviction rate = (requests - hits) / M
Redis Cluster Implementation
# Redis cluster with replication
import redis
from typing import Dict, Any, Optional, List
from dataclasses import dataclass
import json
import hashlib
from datetime import datetime, timedelta
@dataclass
class RedisClusterConfig:
nodes: List[Dict[str, str]]
password: str = None
ssl: bool = True
max_connections: int = 100
class RedisClusterManager:
"""Redis cluster manager with failover"""
def __init__(self, config: RedisClusterConfig):
self.config = config
self.cluster = redis.RedisCluster(
startup_nodes=config.nodes,
password=config.password,
ssl=config.ssl,
max_connections_per_node=config.max_connections,
retry_on_timeout=True
)
def get(self, key: str) -> Optional[Any]:
"""Get value from cache"""
try:
value = self.cluster.get(key)
if value:
return json.loads(value)
return None
except Exception as e:
print(f"Redis get error: {e}")
return None
def set(self, key: str, value: Any, ttl: int = 3600):
"""Set value in cache with TTL"""
try:
serialized = json.dumps(value, default=str)
self.cluster.setex(key, ttl, serialized)
return True
except Exception as e:
print(f"Redis set error: {e}")
return False
def delete(self, key: str):
"""Delete key from cache"""
try:
self.cluster.delete(key)
except Exception as e:
print(f"Redis delete error: {e}")
def mget(self, keys: List[str]) -> List[Optional[Any]]:
"""Get multiple values"""
try:
values = self.cluster.mget(keys)
return [json.loads(v) if v else None for v in values]
except Exception as e:
print(f"Redis mget error: {e}")
return [None] * len(keys)
def mset(self, items: Dict[str, Any], ttl: int = 3600):
"""Set multiple values"""
try:
pipe = self.cluster.pipeline()
for key, value in items.items():
serialized = json.dumps(value, default=str)
pipe.setex(key, ttl, serialized)
pipe.execute()
return True
except Exception as e:
print(f"Redis mset error: {e}")
return False
def get_or_set(self, key: str, factory, ttl: int = 3600) -> Any:
"""Get from cache or set if not exists"""
value = self.get(key)
if value is None:
value = factory()
self.set(key, value, ttl)
return value
def invalidate_pattern(self, pattern: str):
"""Invalidate keys matching pattern"""
try:
keys = self.cluster.keys(pattern)
if keys:
self.cluster.delete(*keys)
except Exception as e:
print(f"Redis pattern delete error: {e}")
def get_cluster_info(self) -> Dict[str, Any]:
"""Get cluster information"""
return self.cluster.cluster_info()
class DistributedLock:
"""Distributed lock using Redis"""
def __init__(self, redis_manager: RedisClusterManager):
self.redis = redis_manager
def acquire(self, lock_name: str, timeout: int = 10,
blocking: bool = True) -> bool:
"""Acquire distributed lock"""
lock_key = f"lock:{lock_name}"
identifier = str(hashlib.md5(
f"{datetime.utcnow().isoformat()}{lock_name}".encode()
).hexdigest())
start_time = datetime.utcnow()
while True:
# Try to acquire lock
if self.redis.set(lock_key, identifier, ttl=timeout, nx=True):
return True
if not blocking:
return False
# Check timeout
elapsed = (datetime.utcnow() - start_time).total_seconds()
if elapsed >= timeout:
return False
# Wait before retry
import time
time.sleep(0.1)
def release(self, lock_name: str, identifier: str) -> bool:
"""Release distributed lock"""
lock_key = f"lock:{lock_name}"
# Use Lua script for atomic release
lua_script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
try:
result = self.redis.cluster.eval(lua_script, 1, lock_key, identifier)
return result == 1
except Exception as e:
print(f"Lock release error: {e}")
return False
Cache Invalidation Patterns
# Cache invalidation strategies
from typing import Dict, Any, Callable, Optional
from dataclasses import dataclass
from datetime import datetime, timedelta
import functools
class CacheInvalidationStrategy:
"""Cache invalidation strategies"""
def __init__(self, redis_manager: RedisClusterManager):
self.redis = redis_manager
def write_through(self, key: str, value: Any, writer: Callable,
ttl: int = 3600):
"""Write-through caching"""
# Write to cache
self.redis.set(key, value, ttl)
# Write to database
writer(key, value)
def write_behind(self, key: str, value: Any, writer: Callable,
delay: int = 5):
"""Write-behind caching (async write)"""
# Write to cache immediately
self.redis.set(key, value)
# Schedule async write to database
import asyncio
asyncio.create_task(self._delayed_write(key, value, writer, delay))
async def _delayed_write(self, key: str, value: Any, writer: Callable,
delay: int):
"""Delayed write to database"""
await asyncio.sleep(delay)
writer(key, value)
def cache_aside(self, key: str, loader: Callable, ttl: int = 3600) -> Any:
"""Cache-aside pattern"""
# Try to get from cache
value = self.redis.get(key)
if value is None:
# Load from database
value = loader(key)
# Store in cache
if value is not None:
self.redis.set(key, value, ttl)
return value
def refresh_ahead(self, key: str, loader: Callable, ttl: int = 3600,
refresh_threshold: float = 0.8):
"""Refresh-ahead caching"""
# Get from cache
value = self.redis.get(key)
if value is None:
# Load from database
value = loader(key)
self.redis.set(key, value, ttl)
else:
# Check if refresh is needed
ttl_remaining = self.redis.cluster.ttl(key)
if ttl_remaining < ttl * refresh_threshold:
# Refresh in background
import asyncio
asyncio.create_task(
self._background_refresh(key, loader, ttl)
)
return value
async def _background_refresh(self, key: str, loader: Callable, ttl: int):
"""Background cache refresh"""
try:
value = loader(key)
self.redis.set(key, value, ttl)
except Exception as e:
print(f"Background refresh failed: {e}")
def event_driven_invalidation(self, event_type: str, key_pattern: str):
"""Event-driven cache invalidation"""
# Subscribe to events
pubsub = self.redis.cluster.pubsub()
pubsub.subscribe(f"cache:{event_type}")
for message in pubsub.listen():
if message['type'] == 'message':
# Invalidate cache
self.redis.invalidate_pattern(key_pattern)
break
class CacheWarmer:
"""Cache warming for cold starts"""
def __init__(self, redis_manager: RedisClusterManager):
self.redis = redis_manager
def warm_cache(self, keys: list, loader: Callable, ttl: int = 3600):
"""Warm cache with data"""
pipe = self.redis.cluster.pipeline()
for key in keys:
value = loader(key)
if value is not None:
serialized = json.dumps(value, default=str)
pipe.setex(key, ttl, serialized)
pipe.execute()
def warm_popular_items(self, popular_items: list, loader: Callable,
ttl: int = 3600):
"""Warm cache with popular items"""
# Sort by popularity
sorted_items = sorted(popular_items, key=lambda x: x.get('popularity', 0), reverse=True)
# Cache top 1000 items
for item in sorted_items[:1000]:
key = f"item:{item['id']}"
value = loader(item['id'])
if value is not None:
self.redis.set(key, value, ttl)
โ ๏ธCache Invalidation
Cache invalidation is one of the hardest problems in computer science. Choose the right strategy based on your consistency requirements.
CDN Configuration
# CloudFront CDN configuration
import boto3
from typing import Dict, Any, List
from dataclasses import dataclass
@dataclass
class CDNConfig:
origin_domain: str
origin_path: str = ''
default_root_object: str = 'index.html'
error_responses: Dict[int, str] = None
class CDNManager:
"""CloudFront CDN manager"""
def __init__(self):
self.cloudfront = boto3.client('cloudfront')
def create_distribution(self, config: CDNConfig) -> str:
"""Create CloudFront distribution"""
response = self.cloudfront.create_distribution(
DistributionConfig={
'CallerReference': str(hash(config.origin_domain)),
'Aliases': {
'Quantity': 0
},
'DefaultRootObject': config.default_root_object,
'Origins': {
'Quantity': 1,
'Items': [
{
'Id': 'origin',
'DomainName': config.origin_domain,
'OriginPath': config.origin_path,
'CustomHeaders': {
'Quantity': 0
},
'CustomOriginConfig': {
'HTTPPort': 80,
'HTTPSPort': 443,
'OriginProtocolPolicy': 'https-only',
'OriginSslProtocols': {
'Quantity': 1,
'Items': ['TLSv1.2']
},
'OriginReadTimeout': 30,
'OriginKeepaliveTimeout': 5
}
}
]
},
'DefaultCacheBehavior': {
'TargetOriginId': 'origin',
'ViewerProtocolPolicy': 'redirect-to-https',
'AllowedMethods': {
'Quantity': 7,
'Items': ['GET', 'HEAD', 'OPTIONS', 'PUT', 'POST', 'PATCH', 'DELETE']
},
'CachedMethods': {
'Quantity': 2,
'Items': ['GET', 'HEAD']
},
'ForwardedValues': {
'QueryString': False,
'Cookies': {'Forward': 'none'},
'Headers': {
'Quantity': 0
},
'QueryStringCacheKeys': {
'Quantity': 0
}
},
'MinTTL': 0,
'DefaultTTL': 86400,
'MaxTTL': 31536000,
'Compress': True
},
'CacheBehaviors': {
'Quantity': 1,
'Items': [
{
'PathPattern': '/api/*',
'TargetOriginId': 'origin',
'ViewerProtocolPolicy': 'redirect-to-https',
'AllowedMethods': {
'Quantity': 2,
'Items': ['GET', 'HEAD']
},
'CachedMethods': {
'Quantity': 2,
'Items': ['GET', 'HEAD']
},
'ForwardedValues': {
'QueryString': True,
'Cookies': {'Forward': 'none'},
'Headers': {
'Quantity': 3,
'Items': ['Authorization', 'Accept', 'Accept-Language']
}
},
'MinTTL': 0,
'DefaultTTL': 0,
'MaxTTL': 300
}
]
},
'Comment': f'CDN for {config.origin_domain}',
'Enabled': True,
'HttpVersion': 'http2and3',
'IsIPV6Enabled': True
}
)
return response['Distribution']['Id']
def invalidate_cache(self, distribution_id: str, paths: List[str]):
"""Invalidate CDN cache"""
self.cloudfront.create_invalidation(
DistributionId=distribution_id,
InvalidationBatch={
'Paths': {
'Quantity': len(paths),
'Items': paths
},
'CallerReference': str(datetime.utcnow().timestamp())
}
)
class CacheHeaders:
"""Cache control headers"""
@staticmethod
def no_cache():
"""No caching"""
return {
'Cache-Control': 'no-store, no-cache, must-revalidate',
'Pragma': 'no-cache'
}
@staticmethod
def cache_for_seconds(seconds: int):
"""Cache for specific duration"""
return {
'Cache-Control': f'public, max-age={seconds}'
}
@staticmethod
def cache_while_revalidating(seconds: int):
"""Cache while revalidating"""
return {
'Cache-Control': f'public, max-age={seconds}, stale-while-revalidate=86400'
}
@staticmethod
def private_cache(seconds: int):
"""Private cache"""
return {
'Cache-Control': f'private, max-age={seconds}'
}
Cache Monitoring
# Cache monitoring and metrics
import boto3
from typing import Dict, Any
from datetime import datetime, timedelta
class CacheMonitor:
"""Cache monitoring and alerting"""
def __init__(self):
self.cloudwatch = boto3.client('cloudwatch')
def get_hit_ratio(self, cluster_name: str) -> float:
"""Get cache hit ratio"""
response = self.cloudwatch.get_metric_statistics(
Namespace='AWS/ElastiCache',
MetricName='CacheHitRate',
Dimensions=[
{
'Name': 'CacheClusterId',
'Value': cluster_name
}
],
StartTime=datetime.utcnow() - timedelta(hours=1),
EndTime=datetime.utcnow(),
Period=300,
Statistics=['Average']
)
if response['Datapoints']:
return response['Datapoints'][-1]['Average']
return 0.0
def get_evictions(self, cluster_name: str) -> float:
"""Get cache evictions"""
response = self.cloudwatch.get_metric_statistics(
Namespace='AWS/ElastiCache',
MetricName='Evictions',
Dimensions=[
{
'Name': 'CacheClusterId',
'Value': cluster_name
}
],
StartTime=datetime.utcnow() - timedelta(hours=1),
EndTime=datetime.utcnow(),
Period=300,
Statistics=['Sum']
)
if response['Datapoints']:
return response['Datapoints'][-1]['Sum']
return 0.0
def get_memory_usage(self, cluster_name: str) -> float:
"""Get memory usage percentage"""
response = self.cloudwatch.get_metric_statistics(
Namespace='AWS/ElastiCache',
MetricName='DatabaseMemoryUsagePercentage',
Dimensions=[
{
'Name': 'CacheClusterId',
'Value': cluster_name
}
],
StartTime=datetime.utcnow() - timedelta(hours=1),
EndTime=datetime.utcnow(),
Period=300,
Statistics=['Average']
)
if response['Datapoints']:
return response['Datapoints'][-1]['Average']
return 0.0
def create_alert(self, cluster_name: str, metric_name: str,
threshold: float, alarm_name: str):
"""Create CloudWatch alarm"""
self.cloudwatch.put_metric_alarm(
AlarmName=alarm_name,
AlarmDescription=f'Alarm for {metric_name}',
MetricName=metric_name,
Namespace='AWS/ElastiCache',
Statistic='Average',
Period=300,
EvaluationPeriods=3,
Threshold=threshold,
ComparisonOperator='GreaterThanThreshold',
Dimensions=[
{
'Name': 'CacheClusterId',
'Value': cluster_name
}
],
AlarmActions=[],
OKActions=[]
)
โ Caching Benefits
A well-designed caching architecture can reduce database load by 90% and improve response times by 10x. Use multi-level caching for optimal performance.
Summary
| Layer | Technology | Latency | Use Case |
|---|---|---|---|
| L1 | Local memory | 0.1ms | Hot data |
| L2 | Redis Cluster | 1ms | Session data |
| L3 | CDN | 10ms | Static content |
| L4 | Database | 100ms | Source of truth |