Building Real-Time Pipelines: Kafka + Flink + Redis
Designing end-to-end real-time data systems
Interview Question
"Design a real-time recommendation system for an e-commerce platform that: (1) processes 500K events/second, (2) updates user profiles in real-time, (3) serves recommendations with <100ms latency, (4) handles exactly-once processing, (5) scales dynamically. Include architecture, technology choices, and code examples."
Difficulty: Hard | Frequently asked at Uber, Netflix, Amazon, Meta
Theoretical Foundation
Real-Time Pipeline Architecture
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Real-Time Pipeline Architecture β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β Sources βββΆ Ingestion βββΆ Processing βββΆ Storage βββΆ Servingβ
β β
β Web App Kafka Flink Redis API β
β Mobile App Kinesis Spark DynamoDB gRPC β
β IoT Devices Pulsar Flink PostgreSQL RESTβ
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Latency Requirements: β β
β β - Ingestion: < 10ms β β
β β - Processing: < 100ms β β
β β - Storage: < 50ms β β
β β - Serving: < 100ms β β
β β - Total end-to-end: < 500ms β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Technology Stack
| Layer | Technology | Use Case |
|---|---|---|
| Ingestion | Kafka, Kinesis, Pulsar | Event streaming |
| Processing | Flink, Spark Streaming, Kafka Streams | Real-time transformations |
| Storage | Redis, DynamoDB, Cassandra | Low-latency reads |
| Serving | gRPC, REST API, GraphQL | Real-time predictions |
Kafka for Ingestion
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Kafka Architecture β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β Producers βββΆ Kafka Cluster βββΆ Consumers β
β β
β Topics: β
β - user_events (100 partitions) β
β - product_views (50 partitions) β
β - purchases (50 partitions) β
β - recommendations (25 partitions) β
β β
β Configuration for 500K events/sec: β
β - 100 partitions per topic β
β - Replication factor: 3 β
β - Batch size: 64KB β
β - Compression: snappy β
β - ACKs: all β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Flink for Processing
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Flink Architecture β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β Source βββΆ Transformations βββΆ Sink β
β β
β Transformations: β
β - Filter: Remove irrelevant events β
β - Map: Transform event format β
β - KeyBy: Partition by user_id β
β - Window: Aggregate over time windows β
β - Join: Combine multiple streams β
β β
β State Management: β
β - RocksDB state backend β
β - Checkpointing for fault tolerance β
β - Watermarks for event time processing β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Redis for Serving
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Redis Architecture β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β Data Structures: β
β - Hashes: User profiles β
β - Sorted Sets: Recommendation scores β
β - Lists: Recent views β
β - Sets: Product categories β
β β
β Configuration: β
β - Cluster mode for scalability β
β - Persistence: AOF for durability β
β - Eviction: LRU for memory management β
β β
β Latency: < 1ms for reads β
β Throughput: > 1M ops/sec β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Exactly-Once Processing
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Exactly-Once Pipeline β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β Kafka βββΆ Flink βββΆ Redis β
β β
β 1. Kafka: Idempotent producer + transactions β
β 2. Flink: Checkpointing + two-phase commit β
β 3. Redis: Idempotent writes (SET is idempotent) β
β β
β Flow: β
β - Flink reads from Kafka β
β - Processes events β
β - Writes to Redis + commits Kafka offset in checkpoint β
β - On failure: Replay from last checkpoint β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Code Implementation
Kafka Producer
from kafka import KafkaProducer
from kafka.admin import KafkaAdminClient, NewTopic
import json
import time
from datetime import datetime
# ============================================================
# KAFKA PRODUCER
# ============================================================
# Create topic
admin_client = KafkaAdminClient(bootstrap_servers='kafka:9092')
topics = [
NewTopic(
name='user_events',
num_partitions=100,
replication_factor=3,
config={
'retention.ms': str(7 * 24 * 60 * 60 * 1000), # 7 days
'compression.type': 'snappy',
'cleanup.policy': 'delete'
}
)
]
admin_client.create_topics(new_topics=topics)
# Producer with idempotency
producer = KafkaProducer(
bootstrap_servers='kafka:9092',
key_serializer=lambda k: k.encode('utf-8'),
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
acks='all',
enable_idempotence=True,
retries=2147483647,
max_in_flight_requests_per_connection=5,
linger_ms=10,
batch_size=65536,
compression_type='snappy'
)
def send_user_event(user_id, event_type, metadata):
"""Send user event to Kafka"""
event = {
'user_id': user_id,
'event_type': event_type,
'metadata': metadata,
'timestamp': datetime.now().isoformat(),
'event_id': f"{user_id}_{int(time.time() * 1000)}"
}
# Partition by user_id for ordering guarantee
producer.send('user_events', key=user_id, value=event)
return event['event_id']
# Example usage
for i in range(1000000):
user_id = f"user_{i % 10000}"
event_type = 'product_view'
metadata = {'product_id': f'product_{i % 1000}', 'category': 'electronics'}
send_user_event(user_id, event_type, metadata)
if i % 10000 == 0:
producer.flush()
print(f"Sent {i} events")
producer.flush()
Flink Processing
// Flink processing pipeline (Java)
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.util.Collector;
public class UserEventProcessor {
public static void main(String[] args) throws Exception {
// Setup Flink environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Enable checkpointing for exactly-once
env.enableCheckpointing(60000); // 1 minute
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// Read from Kafka
DataStream<UserEvent> events = env
.addSource(new FlinkKafkaConsumer<>(
"user_events",
new UserEventSchema(),
kafkaProperties
));
// Process events
DataStream<UserProfile> profiles = events
.keyBy(UserEvent::getUserId)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.process(new UserProfileUpdater());
// Write to Redis
profiles.addSink(new RedisSink<>(redisConfig, new UserProfileRedisMapper()));
env.execute("User Event Processor");
}
}
// Custom process function
class UserProfileUpdater extends ProcessWindowFunction<UserEvent, UserProfile, String, TimeWindow> {
@Override
public void process(String userId, Context context, Iterable<UserEvent> events, Collector<UserProfile> out) {
// Aggregate events into user profile
UserProfile profile = new UserProfile(userId);
for (UserEvent event : events) {
profile.update(event);
}
out.collect(profile);
}
}
PyFlink Processing
# ============================================================
# PYFLINK PROCESSING
# ============================================================
from pyflink.table import EnvironmentSettings, TableEnvironment
from pyflink.table.expressions import col, lit
# Create Flink table environment
env_settings = EnvironmentSettings.in_streaming_mode()
t_env = TableEnvironment.create(env_settings)
# Enable checkpointing
t_env.get_config().set("execution.checkpointing.interval", "60s")
t_env.get_config().set("execution.checkpointing.mode", "EXACTLY_ONCE")
# Define Kafka source
t_env.execute_sql("""
CREATE TABLE user_events (
event_id STRING,
user_id STRING,
event_type STRING,
metadata STRING,
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'user_events',
'properties.bootstrap.servers' = 'kafka:9092',
'properties.group.id' = 'flink-processor',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
)
""")
# Define Redis sink
t_env.execute_sql("""
CREATE TABLE user_profiles (
user_id STRING,
total_views BIGINT,
total_purchases BIGINT,
favorite_category STRING,
last_active TIMESTAMP(3),
PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
'connector' = 'redis',
'redis.mode' = 'cluster',
'redis.hosts' = 'redis-cluster:7000,redis-cluster:7001,redis-cluster:7002',
'redis.command' = 'XADD'
)
""")
# Process events
t_env.execute_sql("""
INSERT INTO user_profiles
SELECT
user_id,
COUNT(CASE WHEN event_type = 'product_view' THEN 1 END) as total_views,
COUNT(CASE WHEN event_type = 'purchase' THEN 1 END) as total_purchases,
MODE() WITHIN GROUP (ORDER BY metadata->>'category') as favorite_category,
MAX(event_time) as last_active
FROM user_events
GROUP BY user_id
""")
Redis Serving
# ============================================================
# REDIS SERVING
# ============================================================
import redis
import json
from typing import Dict, List
class RecommendationService:
def __init__(self):
self.redis = redis.RedisCluster(
startup_nodes=[
{"host": "redis-cluster", "port": 7000},
{"host": "redis-cluster", "port": 7001},
{"host": "redis-cluster", "port": 7002},
],
decode_responses=True
)
def get_recommendations(self, user_id: str, num_recommendations: int = 10) -> List[Dict]:
"""Get real-time recommendations for user"""
# Get user profile
profile = self.redis.hgetall(f"user:{user_id}:profile")
# Get recent views
recent_views = self.redis.lrange(f"user:{user_id}:recent_views", 0, 9)
# Get recommendations from sorted set
recommendations = self.redis.zrevrange(
f"user:{user_id}:recommendations",
0,
num_recommendations - 1,
withscores=True
)
# Format results
results = []
for product_id, score in recommendations:
product_info = self.redis.hgetall(f"product:{product_id}")
results.append({
'product_id': product_id,
'score': score,
'name': product_info.get('name'),
'category': product_info.get('category'),
'price': float(product_info.get('price', 0))
})
return results
def update_user_profile(self, user_id: str, event: Dict):
"""Update user profile with new event"""
pipe = self.redis.pipeline()
# Update profile
pipe.hincrby(f"user:{user_id}:profile", "total_views", 1)
pipe.hset(f"user:{user_id}:profile", "last_active", event['timestamp'])
# Add to recent views
pipe.lpush(f"user:{user_id}:recent_views", event['product_id'])
pipe.ltrim(f"user:{user_id}:recent_views", 0, 99) # Keep last 100
# Update recommendation scores
pipe.zincrby(f"user:{user_id}:recommendations", 1, event['product_id'])
# Execute pipeline
pipe.execute()
def get_user_profile(self, user_id: str) -> Dict:
"""Get user profile"""
return self.redis.hgetall(f"user:{user_id}:profile")
# Example usage
service = RecommendationService()
# Get recommendations
recommendations = service.get_recommendations("user_123", num_recommendations=5)
print(f"Recommendations: {recommendations}")
# Update profile with new event
event = {
'product_id': 'product_456',
'event_type': 'product_view',
'timestamp': '2024-01-15T10:30:00'
}
service.update_user_profile("user_123", event)
End-to-End Pipeline
# ============================================================
# END-TO-END REAL-TIME PIPELINE
# ============================================================
from kafka import KafkaProducer, KafkaConsumer
import redis
import json
from datetime import datetime
import time
class RealTimePipeline:
def __init__(self):
# Kafka producer
self.producer = KafkaProducer(
bootstrap_servers='kafka:9092',
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
acks='all',
enable_idempotence=True
)
# Redis client
self.redis = redis.RedisCluster(
startup_nodes=[
{"host": "redis-cluster", "port": 7000},
],
decode_responses=True
)
def ingest_event(self, user_id: str, event_type: str, metadata: Dict):
"""Ingest event into pipeline"""
event = {
'user_id': user_id,
'event_type': event_type,
'metadata': metadata,
'timestamp': datetime.now().isoformat(),
'event_id': f"{user_id}_{int(time.time() * 1000)}"
}
# Send to Kafka
self.producer.send('user_events', key=user_id, value=event)
return event['event_id']
def process_event(self, event: Dict):
"""Process event and update recommendations"""
user_id = event['user_id']
event_type = event['event_type']
metadata = event['metadata']
pipe = self.redis.pipeline()
# Update user profile
pipe.hincrby(f"user:{user_id}:profile", "total_views", 1)
pipe.hset(f"user:{user_id}:profile", "last_active", event['timestamp'])
# Add to recent views
pipe.lpush(f"user:{user_id}:recent_views", metadata['product_id'])
pipe.ltrim(f"user:{user_id}:recent_views", 0, 99)
# Update recommendation scores
pipe.zincrby(f"user:{user_id}:recommendations", 1, metadata['product_id'])
# Get similar products (simplified)
similar_products = self.get_similar_products(metadata['product_id'])
for product_id in similar_products:
pipe.zincrby(f"user:{user_id}:recommendations", 0.5, product_id)
pipe.execute()
def get_similar_products(self, product_id: str) -> List[str]:
"""Get similar products (simplified)"""
# In production, use collaborative filtering or content-based similarity
return self.redis.smembers(f"product:{product_id}:similar")
def get_recommendations(self, user_id: str, num_recommendations: int = 10) -> List[Dict]:
"""Get recommendations for user"""
recommendations = self.redis.zrevrange(
f"user:{user_id}:recommendations",
0,
num_recommendations - 1,
withscores=True
)
results = []
for product_id, score in recommendations:
product_info = self.redis.hgetall(f"product:{product_id}")
results.append({
'product_id': product_id,
'score': score,
'name': product_info.get('name'),
'price': float(product_info.get('price', 0))
})
return results
# Usage
pipeline = RealTimePipeline()
# Ingest events
for i in range(1000000):
user_id = f"user_{i % 10000}"
product_id = f"product_{i % 1000}"
pipeline.ingest_event(user_id, 'product_view', {
'product_id': product_id,
'category': 'electronics'
})
if i % 10000 == 0:
print(f"Ingested {i} events")
Monitoring
# ============================================================
# MONITORING
# ============================================================
from prometheus_client import start_http_server, Gauge, Counter
import time
# Prometheus metrics
EVENTS_PROCESSED = Counter('events_processed_total', 'Total events processed')
EVENTS_FAILED = Counter('events_failed_total', 'Total events failed')
PROCESSING_LATENCY = Gauge('processing_latency_seconds', 'Processing latency')
KAFKA_LAG = Gauge('kafka_lag', 'Kafka consumer lag', ['topic', 'partition'])
# Start metrics server
start_http_server(8000)
def monitor_pipeline():
"""Monitor pipeline health"""
while True:
# Get Kafka lag
consumer = KafkaConsumer(
'user_events',
bootstrap_servers='kafka:9092',
group_id='monitor'
)
for partition in consumer.assignment():
lag = consumer.end_offsets([partition])[partition] - consumer.position(partition)
KAFKA_LAG.labels(topic='user_events', partition=partition.partition).set(lag)
consumer.close()
# Check Redis health
redis_client = redis.Redis(host='redis-cluster', port=7000)
redis_client.ping()
time.sleep(10)
# Start monitoring
monitor_pipeline()
π‘
Production Tip: For 500K events/second, use: (1) 100+ Kafka partitions, (2) Flink with RocksDB state backend, (3) Redis Cluster with 6+ nodes, (4) Horizontal scaling with Kubernetes.
Common Follow-Up Questions
Q1: How do you handle backpressure?
# Flink backpressure handling
env.get_config().set("pipeline.backpressure.interval", "500ms")
env.get_config().set("pipeline.backpressure.strategy", "adaptive")
# Kafka consumer backpressure
consumer = KafkaConsumer(
'user_events',
max_poll_records=500,
max_poll_interval_ms=300000
)
Q2: How do you handle late events?
# Flink watermarks for late events
t_env.execute_sql("""
SELECT
user_id,
COUNT(*) as event_count
FROM user_events
GROUP BY user_id,
TUMBLE(event_time, INTERVAL '5' MINUTE)
""")
Q3: How do you monitor real-time pipelines?
# Metrics to monitor
- Kafka lag (consumer lag)
- Processing latency (end-to-end)
- Throughput (events/second)
- Error rate
- Redis memory usage
- Redis hit/miss rate
Q4: How do you test real-time pipelines?
# Integration test with test containers
from testcontainers.kafka import KafkaContainer
from testcontainers.redis import RedisContainer
def test_real_time_pipeline():
with KafkaContainer() as kafka, RedisContainer() as redis:
# Setup pipeline with test containers
pipeline = RealTimePipeline(kafka.get_bootstrap_server(), redis.get_redis_host())
# Ingest test events
pipeline.ingest_event("test_user", "product_view", {"product_id": "test_product"})
# Wait for processing
time.sleep(5)
# Verify results
recommendations = pipeline.get_recommendations("test_user")
assert len(recommendations) > 0
β οΈ
Critical Consideration: Real-time pipelines are harder to debug than batch pipelines. Always implement: (1) comprehensive logging, (2) dead letter queues for failed events, (3) idempotent processing, and (4) monitoring/alerting.
Company-Specific Tips
Uber Interview Tips
- Discuss real-time location tracking
- Explain surge pricing calculations
- Mention ride matching algorithms
- Talk about fraud detection in real-time
Netflix Interview Tips
- Focus on content recommendations
- Explain viewing history processing
- Mention A/B testing real-time metrics
- Talk about personalization pipelines
Amazon Interview Tips
- Discuss product recommendations
- Explain dynamic pricing calculations
- Mention inventory management real-time
- Talk about fraud detection pipelines
βΉοΈ
Final Takeaway: Real-time pipelines require careful architecture choices. Use Kafka for ingestion, Flink for processing, and Redis for serving. Always consider: (1) latency requirements, (2) throughput requirements, (3) exactly-once semantics, and (4) fault tolerance.