Lambda vs Kappa Architecture
Designing real-time and batch data processing systems
Interview Question
"Design a real-time analytics platform for a ride-sharing company. The system must: (1) process 1M events/second, (2) provide sub-second latency for driver location tracking, (3) support batch analytics for historical trends, (4) handle exactly-once processing. Compare Lambda and Kappa architectures and justify your choice."
Difficulty: Hard | Frequently asked at Uber, LinkedIn, Netflix, Stripe
Theoretical Foundation
Batch Processing
Batch processing collects data over time and processes it in large, scheduled jobs.
Characteristics:
- High latency (minutes to hours)
- High throughput (terabytes per job)
- Efficient resource utilization
- Complex error handling (reprocessing)
- Well-established tools (Spark, Hadoop, MapReduce)
Mathematical model:
- Latency
- Throughput
- Where = data volume, = collection period
Stream Processing
Stream processing handles data as it arrives, producing results in real-time.
Characteristics:
- Low latency (milliseconds to seconds)
- Continuous processing
- State management required
- Complex fault tolerance
- Tools: Kafka Streams, Flink, Spark Streaming
Mathematical model:
- Latency
- Throughput
- Backpressure occurs when
Lambda Architecture
Lambda architecture processes data through both batch and stream layers simultaneously.
Components:
-
Batch Layer (Master Dataset)
- Stores all raw data
- Runs batch jobs (hourly/daily)
- Produces batch views
-
Speed Layer (Real-time)
- Processes recent data
- Low-latency updates
- Compensates for batch latency
-
Serving Layer
- Merges batch and real-time views
- Responds to queries
Pros:
- Fault tolerant (batch layer reprocesses)
- Handles late-arriving data
- Supports complex analytics
Cons:
- Duplicate logic (batch + stream)
- Complex to maintain
- Consistency challenges
Kappa Architecture
Kappa architecture processes all data through a single stream processing layer.
Key principles:
- All data goes through Kafka (or similar log)
- Stream processor handles all transformations
- Reprocessing = replay from offset
- No separate batch layer
Pros:
- Single codebase (no duplication)
- Simpler to maintain
- Consistent logic
Cons:
- Requires robust log retention
- Stream processor must handle all workloads
- Reprocessing can be slow for large historical data
Comparison Matrix
| Aspect | Lambda | Kappa |
|---|---|---|
| Code Duplication | High (batch + stream) | Low (stream only) |
| Complexity | High | Medium |
| Fault Tolerance | Excellent (batch reprocess) | Good (log replay) |
| Late Data | Handled by batch layer | Requires windowing |
| Reprocessing | Fast (batch) | Slow (replay log) |
| Consistency | Challenging | Easier |
| Operational Cost | Higher | Lower |
| Team Skills | Requires batch + stream | Requires stream only |
Windowing Strategies
Stream processing requires windowing to group events:
Tumbling Window:
Events: [e1, e2, e3, e4, e5, e6, e7, e8, e9]
Window size: 5 seconds
Window 1: [e1, e2, e3] (0-5s)
Window 2: [e4, e5, e6] (5-10s)
Window 3: [e7, e8, e9] (10-15s)
Sliding Window:
Events: [e1, e2, e3, e4, e5, e6, e7, e8, e9]
Window size: 5 seconds, slide: 2 seconds
Window 1: [e1, e2, e3] (0-5s)
Window 2: [e2, e3, e4] (2-7s)
Window 3: [e3, e4, e5] (4-9s)
Session Window:
Events: [e1, e2, gap, e3, e4, gap, e5]
Session timeout: 10 seconds
Session 1: [e1, e2] (active for 5s, then gap > 10s)
Session 2: [e3, e4] (active for 3s, then gap > 10s)
Session 3: [e5] (active until timeout)
Exactly-Once Semantics
Exactly-once processing ensures each event is processed exactly once, despite failures.
Three components:
- At-least-once delivery: Kafka guarantees delivery
- Idempotent processing: Same result if processed twice
- Transactional writes: Atomic updates to output
βΉοΈ
Key Insight: Exactly-once semantics is achieved through a combination of: (1) Kafka's idempotent producer, (2) transactional consumer offsets, (3) idempotent sink operations, and (4) transactional writes. In practice, most systems achieve effectively-once semantics through idempotency.
Backpressure Handling
When the stream processor can't keep up with incoming data:
Handling strategies:
- Drop events: Accept data loss
- Buffer events: Increase memory usage
- Slow down source: Apply backpressure to producers
- Scale up processor: Add more resources
Code Implementation
Lambda Architecture Implementation
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType
spark = SparkSession.builder \
.appName("LambdaArchitecture") \
.getOrCreate()
# ============================================================
# SPEED LAYER: Real-time processing
# ============================================================
# Kafka source for real-time events
schema = StructType([
StructField("ride_id", StringType()),
StructField("driver_id", StringType()),
StructField("pickup_lat", DoubleType()),
StructField("pickup_lon", DoubleType()),
StructField("timestamp", TimestampType())
])
realtime_df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("subscribe", "ride_events") \
.load() \
.select(F.from_json(F.col("value").cast("string"), schema).alias("data")) \
.select("data.*")
# Real-time aggregation (last 5 minutes)
realtime_agg = realtime_df \
.withWatermark("timestamp", "5 minutes") \
.groupBy(
F.window("timestamp", "5 minutes"),
"driver_id"
) \
.agg(
F.count("*").alias("ride_count"),
F.avg("pickup_lat").alias("avg_lat"),
F.avg("pickup_lon").alias("avg_lon")
)
# Write to real-time view
realtime_query = realtime_agg.writeStream \
.format("delta") \
.outputMode("update") \
.option("checkpointLocation", "s3://checkpoints/realtime_agg/") \
.start("s3://lakehouse/speed/ride_aggregates/")
# ============================================================
# BATCH LAYER: Historical processing
# ============================================================
def batch_job(execution_date):
"""Daily batch job to process all historical data"""
# Read all raw data
raw_data = spark.read.parquet("s3://data-lake/raw/ride_events/")
# Process (same logic as streaming, but over full history)
batch_agg = raw_data \
.groupBy(
F.window("timestamp", "1 day"),
"driver_id"
) \
.agg(
F.count("*").alias("ride_count"),
F.avg("pickup_lat").alias("avg_lat"),
F.avg("pickup_lon").alias("avg_lon")
)
# Write to batch view
batch_agg.write.format("delta") \
.mode("overwrite") \
.save("s3://lakehouse/batch/ride_aggregates/")
# ============================================================
# SERVING LAYER: Merge real-time and batch
# ============================================================
def merge_views(query_date):
"""Merge real-time and batch views for query"""
# Read batch view (all historical data)
batch_view = spark.read.format("delta") \
.load("s3://lakehouse/batch/ride_aggregates/")
# Read real-time view (last 5 minutes)
realtime_view = spark.read.format("delta") \
.load("s3://lakehouse/speed/ride_aggregates/")
# Union with deduplication
merged = batch_view.unionByName(realtime_view, allowMissingColumns=True) \
.dropDuplicates(["driver_id", "window"])
return merged
Kappa Architecture Implementation
# ============================================================
# KAPPA ARCHITECTURE: Single stream processor
# ============================================================
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder \
.appName("KappaArchitecture") \
.getOrCreate()
# Single Kafka source for all data
all_events = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("subscribe", "ride_events") \
.option("startingOffsets", "earliest") \
.load() \
.select(F.from_json(F.col("value").cast("string"), schema).alias("data")) \
.select("data.*")
# ============================================================
# STREAM PROCESSOR: Handles all transformations
# ============================================================
# Real-time aggregation with different window sizes
def create_aggregations(df, window_size, slide_size=None):
"""Create aggregations with configurable windows"""
if slide_size:
# Sliding window
return df \
.withWatermark("timestamp", f"{window_size * 2} seconds") \
.groupBy(
F.window("timestamp", f"{window_size} seconds", f"{slide_size} seconds"),
"driver_id"
) \
.agg(
F.count("*").alias("ride_count"),
F.avg("pickup_lat").alias("avg_lat"),
F.avg("pickup_lon").alias("avg_lon")
)
else:
# Tumbling window
return df \
.withWatermark("timestamp", f"{window_size * 2} seconds") \
.groupBy(
F.window("timestamp", f"{window_size} seconds"),
"driver_id"
) \
.agg(
F.count("*").alias("ride_count"),
F.avg("pickup_lat").alias("avg_lat"),
F.avg("pickup_lon").alias("avg_lon")
)
# Create multiple aggregations
agg_5min = create_aggregations(all_events, 300) # 5-minute tumbling
agg_1hr = create_aggregations(all_events, 3600) # 1-hour tumbling
agg_5min_slide = create_aggregations(all_events, 300, 60) # 5-minute sliding, 1-minute slide
# Write aggregations to separate sinks
query_5min = agg_5min.writeStream \
.format("delta") \
.outputMode("update") \
.option("checkpointLocation", "s3://checkpoints/agg_5min/") \
.start("s3://lakehouse/aggregates/5min/")
query_1hr = agg_1hr.writeStream \
.format("delta") \
.outputMode("update") \
.option("checkpointLocation", "s3://checkpoints/agg_1hr/") \
.start("s3://lakehouse/aggregates/1hr/")
# ============================================================
# REPROCESSING: Replay from Kafka offset
# ============================================================
def reprocess_data(start_offset, end_offset):
"""Reprocess data by replaying from Kafka"""
# Read from specific offset range
reprocess_df = spark.read \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("subscribe", "ride_events") \
.option("startingOffsets", f'{{"ride_events": {{"0": {start_offset}}}}}') \
.option("endingOffsets", f'{{"ride_events": {{"0": {end_offset}}}}}') \
.load() \
.select(F.from_json(F.col("value").cast("string"), schema).alias("data")) \
.select("data.*")
# Apply same transformations
reprocessed = create_aggregations(reprocess_df, 300)
# Write to separate location (or overwrite)
reprocessed.write.format("delta") \
.mode("overwrite") \
.save("s3://lakehouse/reprocessed/5min/")
Exactly-Once Processing with Kafka
# ============================================================
# EXACTLY-ONCE SEMANTICS
# ============================================================
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder \
.appName("ExactlyOnceProcessing") \
.config("spark.sql.streaming.checkpointLocation", "s3://checkpoints/") \
.getOrCreate()
# Kafka source with exactly-once guarantees
streaming_df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("subscribe", "transactions") \
.option("startingOffsets", "earliest") \
.option("kafka.group.id", "exactly-once-processor") \
.load()
# Process with idempotent operations
processed_df = streaming_df \
.select(F.from_json(F.col("value").cast("string"), schema).alias("data")) \
.select("data.*") \
.withColumn("processed_at", F.current_timestamp()) \
.withColumn("event_id", F.expr("uuid()")) # Unique ID for idempotency
# Write with Delta Lake (supports exactly-once)
query = processed_df.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", "s3://checkpoints/exactly_once/") \
.option("mergeSchema", "true") \
.start("s3://lakehouse/processed/transactions/")
# Delta Lake guarantees:
# 1. Transaction log ensures atomic writes
# 2. Checkpoint tracks processed offsets
# 3. On failure, replay from last checkpoint
Backpressure Handling
# ============================================================
# BACKPRESSURE CONFIGURATION
# ============================================================
# Spark Structured Streaming backpressure
query = processed_df.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", "s3://checkpoints/backpressure/") \
.option("maxOffsetsPerTrigger", 100000) # Limit batch size
.option("maxRatePerPartition", 10000) # Limit per partition
.start("s3://lakehouse/processed/")
# Monitor processing rate
metrics = spark.read.json("s3://checkpoints/backpressure/offsets/")
# Dynamic adjustment based on processing time
def adjust_rate(current_rate, processing_time, target_time=1000):
"""Adjust processing rate based on processing time"""
if processing_time > target_time:
return current_rate * 0.8 # Reduce by 20%
elif processing_time < target_time * 0.5:
return current_rate * 1.2 # Increase by 20%
return current_rate
Late Data Handling
# ============================================================
# LATE DATA HANDLING
# ============================================================
# With watermarking
watermarked_df = all_events \
.withWatermark("timestamp", "10 minutes") \
.groupBy(
F.window("timestamp", "5 minutes"),
"driver_id"
) \
.agg(F.count("*").alias("ride_count"))
# Late data beyond watermark is dropped
# Allow late data within watermark period
# For Lambda architecture, late data is handled by batch layer
def handle_late_data(late_df, cutoff_date):
"""Process late-arriving data"""
# Find records that arrived after their window
late_records = late_df.filter(
F.col("timestamp") < cutoff_date
)
# Update aggregations
late_records.write.format("delta") \
.mode("append") \
.save("s3://lakehouse/late_data/")
# Reprocess affected windows
affected_windows = late_records.select(
F.window("timestamp", "5 minutes").alias("window")
).distinct()
return affected_windows
π‘
Production Tip: For Kappa architecture, use Kafka's log compaction to retain only the latest state for each key. This reduces storage costs while enabling replay. For Lambda, use Delta Lake's time travel for efficient reprocessing.
Common Follow-Up Questions
Q1: When would you choose Lambda over Kappa?
Choose Lambda when:
- You have existing batch infrastructure
- Complex analytics require full history
- Team has batch processing expertise
- Late-arriving data is common
- Regulatory requirements need batch audit trails
Choose Kappa when:
- Real-time is the primary requirement
- Team has strong streaming expertise
- You want simpler operations
- Code duplication is a concern
- All data can be retained in Kafka
Q2: How do you handle schema evolution in streaming?
# Schema evolution in Spark Structured Streaming
streaming_df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("subscribe", "events") \
.load()
# Use schema evolution with Delta Lake
query = streaming_df.writeStream \
.format("delta") \
.option("mergeSchema", "true") # Allow schema evolution
.start("s3://lakehouse/events/")
# For Avro-encoded Kafka topics
streaming_df = spark.readStream \
.format("kafka") \
.load() \
.select(F.from_avro(F.col("value"), schema_string).alias("data"))
Q3: How do you monitor streaming pipelines?
# Monitoring metrics
def monitor_streaming_query(query):
"""Monitor streaming query health"""
# Get query status
status = query.status
print(f"Query active: {status['isDataAvailable']}")
print(f"Processing rate: {status['sources'][0]['numInputRows']}")
# Get progress updates
progress = query.lastProgress
if progress:
print(f"Batch duration: {progress['batchDuration']}ms")
print(f"Input rows: {progress['numInputRows']}")
print(f"Processed rows: {progress['processedRowsPerSecond']}")
# Alert on issues
if not status['isDataAvailable']:
alert("Streaming query has no data available")
if status['sources'][0]['numInputRows'] == 0:
alert("Streaming query is not processing any rows")
Q4: How do you test streaming pipelines?
# Test streaming pipelines with test data
def test_streaming_pipeline():
"""Test streaming pipeline with controlled input"""
# Create test data
test_data = [
("ride_1", "driver_1", 40.7128, -74.0060, "2024-01-15 10:00:00"),
("ride_2", "driver_1", 40.7128, -74.0060, "2024-01-15 10:01:00"),
("ride_3", "driver_2", 34.0522, -118.2437, "2024-01-15 10:02:00"),
]
# Write to Kafka
for ride in test_data:
producer.send("ride_events", value=json.dumps(ride).encode())
# Wait for processing
time.sleep(10)
# Verify output
result = spark.read.format("delta").load("s3://lakehouse/aggregates/5min/")
assert result.count() > 0
assert result.filter("driver_id = 'driver_1'").count() == 2
β οΈ
Critical Consideration: Streaming pipelines are harder to debug than batch pipelines. Always implement: (1) comprehensive logging, (2) dead letter queues for failed events, (3) idempotent processing, and (4) monitoring/alerting. Test with production-like data volumes before deploying.
Company-Specific Tips
Uber Interview Tips
- Discuss real-time location tracking architecture
- Explain exactly-once for ride matching
- Mention dynamic pricing real-time calculations
- Talk about surge pricing streaming aggregations
LinkedIn Interview Tips
- Focus on real-time recommendations
- Discuss newsfeed streaming architecture
- Mention connection suggestions real-time updates
- Talk about spam detection in real-time
Netflix Interview Tips
- Discuss content recommendation real-time updates
- Explain viewing history streaming processing
- Mention A/B testing real-time metrics
- Talk about personalization streaming pipelines
βΉοΈ
Final Takeaway: Lambda architecture is more mature and fault-tolerant, but Kappa is simpler to maintain. For most new projects, Kappa is preferred unless you have specific requirements for batch reprocessing. The key is understanding your latency, throughput, and fault tolerance requirements before choosing.