πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Lambda vs Kappa Architecture

Data EngineeringStream Processing⭐ Premium

Advertisement

Uber & LinkedIn Interview

Lambda vs Kappa Architecture

Designing real-time and batch data processing systems

Interview Question

"Design a real-time analytics platform for a ride-sharing company. The system must: (1) process 1M events/second, (2) provide sub-second latency for driver location tracking, (3) support batch analytics for historical trends, (4) handle exactly-once processing. Compare Lambda and Kappa architectures and justify your choice."

Difficulty: Hard | Frequently asked at Uber, LinkedIn, Netflix, Stripe


Theoretical Foundation

Batch Processing

Batch processing collects data over time and processes it in large, scheduled jobs.

Characteristics:

  • High latency (minutes to hours)
  • High throughput (terabytes per job)
  • Efficient resource utilization
  • Complex error handling (reprocessing)
  • Well-established tools (Spark, Hadoop, MapReduce)
Batch ProcessingData Sources → Ingestion → Storage → Processing → OutputHour 1Collect data→Hour 2Collect data→Hour 3Process Hour 1+2Latency: O(hours)Throughput: O(TB/job)

Mathematical model:

  • Latency L=Tcollect+TprocessL = T_{collect} + T_{process}
  • Throughput T=DTprocessT = \frac{D}{T_{process}}
  • Where DD = data volume, TcollectT_{collect} = collection period

Stream Processing

Stream processing handles data as it arrives, producing results in real-time.

Characteristics:

  • Low latency (milliseconds to seconds)
  • Continuous processing
  • State management required
  • Complex fault tolerance
  • Tools: Kafka Streams, Flink, Spark Streaming
Stream ProcessingSources β†’ Kafka β†’ Stream Processor β†’ Sinkse1e2e3e4e5e6e7r1r2r3r4r5r6r7Latency: O(ms) | Throughput: O(events/sec)

Mathematical model:

  • Latency L=Tingest+Tprocess+ToutputL = T_{ingest} + T_{process} + T_{output}
  • Throughput T=min⁑(Tsource,Tprocessor,Tsink)T = \min(T_{source}, T_{processor}, T_{sink})
  • Backpressure occurs when Tsource>TprocessorT_{source} > T_{processor}

Lambda Architecture

Lambda architecture processes data through both batch and stream layers simultaneously.

Lambda ArchitectureSpeed LayerStreaming → Real-time ViewBatch LayerBatch → Batch Views→→Serving LayerMerge real-time + batch→ Query APIQuery Logicresult = merge(realtime_view, batch_view)realtime: Last 5 minbatch: All historical

Components:

  1. Batch Layer (Master Dataset)

    • Stores all raw data
    • Runs batch jobs (hourly/daily)
    • Produces batch views
  2. Speed Layer (Real-time)

    • Processes recent data
    • Low-latency updates
    • Compensates for batch latency
  3. Serving Layer

    • Merges batch and real-time views
    • Responds to queries

Pros:

  • Fault tolerant (batch layer reprocesses)
  • Handles late-arriving data
  • Supports complex analytics

Cons:

  • Duplicate logic (batch + stream)
  • Complex to maintain
  • Consistency challenges

Kappa Architecture

Kappa architecture processes all data through a single stream processing layer.

Kappa ArchitectureAll Data β†’ Kafka (Immutable Log) β†’ Stream Processor β†’ ViewsQuery Logicresult = stream_processor(kafka, query_params)For reprocessing: Replay from Kafka offsetKafka Retention: 7-30 days (or forever with tiered storage)

Key principles:

  1. All data goes through Kafka (or similar log)
  2. Stream processor handles all transformations
  3. Reprocessing = replay from offset
  4. No separate batch layer

Pros:

  • Single codebase (no duplication)
  • Simpler to maintain
  • Consistent logic

Cons:

  • Requires robust log retention
  • Stream processor must handle all workloads
  • Reprocessing can be slow for large historical data

Comparison Matrix

AspectLambdaKappa
Code DuplicationHigh (batch + stream)Low (stream only)
ComplexityHighMedium
Fault ToleranceExcellent (batch reprocess)Good (log replay)
Late DataHandled by batch layerRequires windowing
ReprocessingFast (batch)Slow (replay log)
ConsistencyChallengingEasier
Operational CostHigherLower
Team SkillsRequires batch + streamRequires stream only

Windowing Strategies

Stream processing requires windowing to group events:

Tumbling Window:

Architecture Diagram
Events: [e1, e2, e3, e4, e5, e6, e7, e8, e9]
Window size: 5 seconds

Window 1: [e1, e2, e3] (0-5s)
Window 2: [e4, e5, e6] (5-10s)
Window 3: [e7, e8, e9] (10-15s)

Sliding Window:

Architecture Diagram
Events: [e1, e2, e3, e4, e5, e6, e7, e8, e9]
Window size: 5 seconds, slide: 2 seconds

Window 1: [e1, e2, e3] (0-5s)
Window 2: [e2, e3, e4] (2-7s)
Window 3: [e3, e4, e5] (4-9s)

Session Window:

Architecture Diagram
Events: [e1, e2, gap, e3, e4, gap, e5]
Session timeout: 10 seconds

Session 1: [e1, e2] (active for 5s, then gap > 10s)
Session 2: [e3, e4] (active for 3s, then gap > 10s)
Session 3: [e5] (active until timeout)

Exactly-Once Semantics

Exactly-once processing ensures each event is processed exactly once, despite failures.

Three components:

  1. At-least-once delivery: Kafka guarantees delivery
  2. Idempotent processing: Same result if processed twice
  3. Transactional writes: Atomic updates to output
Exactly-Once ArchitectureKafka Source→Processor→Transactional SinkRead + offsetTransformWrite + offset (single txn)Failure: Write succeeds, offset fails → Duplicate. Solution: Transactional outbox pattern

ℹ️

Key Insight: Exactly-once semantics is achieved through a combination of: (1) Kafka's idempotent producer, (2) transactional consumer offsets, (3) idempotent sink operations, and (4) transactional writes. In practice, most systems achieve effectively-once semantics through idempotency.

Backpressure Handling

When the stream processor can't keep up with incoming data:

Backpressure ScenariosScenario 1: Source 1M events/sec β†’ Processor 500K events/secScenario 2: Processor 1M events/sec β†’ Sink 200K events/secScenario 3: Sink 100K events/sec β†’ Processor 1M events/secSolutions: Kafka buffers, dynamic scaling, queue buildup, backpressure to producers

Handling strategies:

  1. Drop events: Accept data loss
  2. Buffer events: Increase memory usage
  3. Slow down source: Apply backpressure to producers
  4. Scale up processor: Add more resources

Code Implementation

Lambda Architecture Implementation

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType

spark = SparkSession.builder \
    .appName("LambdaArchitecture") \
    .getOrCreate()

# ============================================================
# SPEED LAYER: Real-time processing
# ============================================================

# Kafka source for real-time events
schema = StructType([
    StructField("ride_id", StringType()),
    StructField("driver_id", StringType()),
    StructField("pickup_lat", DoubleType()),
    StructField("pickup_lon", DoubleType()),
    StructField("timestamp", TimestampType())
])

realtime_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "ride_events") \
    .load() \
    .select(F.from_json(F.col("value").cast("string"), schema).alias("data")) \
    .select("data.*")

# Real-time aggregation (last 5 minutes)
realtime_agg = realtime_df \
    .withWatermark("timestamp", "5 minutes") \
    .groupBy(
        F.window("timestamp", "5 minutes"),
        "driver_id"
    ) \
    .agg(
        F.count("*").alias("ride_count"),
        F.avg("pickup_lat").alias("avg_lat"),
        F.avg("pickup_lon").alias("avg_lon")
    )

# Write to real-time view
realtime_query = realtime_agg.writeStream \
    .format("delta") \
    .outputMode("update") \
    .option("checkpointLocation", "s3://checkpoints/realtime_agg/") \
    .start("s3://lakehouse/speed/ride_aggregates/")

# ============================================================
# BATCH LAYER: Historical processing
# ============================================================

def batch_job(execution_date):
    """Daily batch job to process all historical data"""
    
    # Read all raw data
    raw_data = spark.read.parquet("s3://data-lake/raw/ride_events/")
    
    # Process (same logic as streaming, but over full history)
    batch_agg = raw_data \
        .groupBy(
            F.window("timestamp", "1 day"),
            "driver_id"
        ) \
        .agg(
            F.count("*").alias("ride_count"),
            F.avg("pickup_lat").alias("avg_lat"),
            F.avg("pickup_lon").alias("avg_lon")
        )
    
    # Write to batch view
    batch_agg.write.format("delta") \
        .mode("overwrite") \
        .save("s3://lakehouse/batch/ride_aggregates/")

# ============================================================
# SERVING LAYER: Merge real-time and batch
# ============================================================

def merge_views(query_date):
    """Merge real-time and batch views for query"""
    
    # Read batch view (all historical data)
    batch_view = spark.read.format("delta") \
        .load("s3://lakehouse/batch/ride_aggregates/")
    
    # Read real-time view (last 5 minutes)
    realtime_view = spark.read.format("delta") \
        .load("s3://lakehouse/speed/ride_aggregates/")
    
    # Union with deduplication
    merged = batch_view.unionByName(realtime_view, allowMissingColumns=True) \
        .dropDuplicates(["driver_id", "window"])
    
    return merged

Kappa Architecture Implementation

# ============================================================
# KAPPA ARCHITECTURE: Single stream processor
# ============================================================

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder \
    .appName("KappaArchitecture") \
    .getOrCreate()

# Single Kafka source for all data
all_events = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "ride_events") \
    .option("startingOffsets", "earliest") \
    .load() \
    .select(F.from_json(F.col("value").cast("string"), schema).alias("data")) \
    .select("data.*")

# ============================================================
# STREAM PROCESSOR: Handles all transformations
# ============================================================

# Real-time aggregation with different window sizes
def create_aggregations(df, window_size, slide_size=None):
    """Create aggregations with configurable windows"""
    
    if slide_size:
        # Sliding window
        return df \
            .withWatermark("timestamp", f"{window_size * 2} seconds") \
            .groupBy(
                F.window("timestamp", f"{window_size} seconds", f"{slide_size} seconds"),
                "driver_id"
            ) \
            .agg(
                F.count("*").alias("ride_count"),
                F.avg("pickup_lat").alias("avg_lat"),
                F.avg("pickup_lon").alias("avg_lon")
            )
    else:
        # Tumbling window
        return df \
            .withWatermark("timestamp", f"{window_size * 2} seconds") \
            .groupBy(
                F.window("timestamp", f"{window_size} seconds"),
                "driver_id"
            ) \
            .agg(
                F.count("*").alias("ride_count"),
                F.avg("pickup_lat").alias("avg_lat"),
                F.avg("pickup_lon").alias("avg_lon")
            )

# Create multiple aggregations
agg_5min = create_aggregations(all_events, 300)  # 5-minute tumbling
agg_1hr = create_aggregations(all_events, 3600)  # 1-hour tumbling
agg_5min_slide = create_aggregations(all_events, 300, 60)  # 5-minute sliding, 1-minute slide

# Write aggregations to separate sinks
query_5min = agg_5min.writeStream \
    .format("delta") \
    .outputMode("update") \
    .option("checkpointLocation", "s3://checkpoints/agg_5min/") \
    .start("s3://lakehouse/aggregates/5min/")

query_1hr = agg_1hr.writeStream \
    .format("delta") \
    .outputMode("update") \
    .option("checkpointLocation", "s3://checkpoints/agg_1hr/") \
    .start("s3://lakehouse/aggregates/1hr/")

# ============================================================
# REPROCESSING: Replay from Kafka offset
# ============================================================

def reprocess_data(start_offset, end_offset):
    """Reprocess data by replaying from Kafka"""
    
    # Read from specific offset range
    reprocess_df = spark.read \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "kafka:9092") \
        .option("subscribe", "ride_events") \
        .option("startingOffsets", f'{{"ride_events": {{"0": {start_offset}}}}}') \
        .option("endingOffsets", f'{{"ride_events": {{"0": {end_offset}}}}}') \
        .load() \
        .select(F.from_json(F.col("value").cast("string"), schema).alias("data")) \
        .select("data.*")
    
    # Apply same transformations
    reprocessed = create_aggregations(reprocess_df, 300)
    
    # Write to separate location (or overwrite)
    reprocessed.write.format("delta") \
        .mode("overwrite") \
        .save("s3://lakehouse/reprocessed/5min/")

Exactly-Once Processing with Kafka

# ============================================================
# EXACTLY-ONCE SEMANTICS
# ============================================================

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder \
    .appName("ExactlyOnceProcessing") \
    .config("spark.sql.streaming.checkpointLocation", "s3://checkpoints/") \
    .getOrCreate()

# Kafka source with exactly-once guarantees
streaming_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "transactions") \
    .option("startingOffsets", "earliest") \
    .option("kafka.group.id", "exactly-once-processor") \
    .load()

# Process with idempotent operations
processed_df = streaming_df \
    .select(F.from_json(F.col("value").cast("string"), schema).alias("data")) \
    .select("data.*") \
    .withColumn("processed_at", F.current_timestamp()) \
    .withColumn("event_id", F.expr("uuid()"))  # Unique ID for idempotency

# Write with Delta Lake (supports exactly-once)
query = processed_df.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "s3://checkpoints/exactly_once/") \
    .option("mergeSchema", "true") \
    .start("s3://lakehouse/processed/transactions/")

# Delta Lake guarantees:
# 1. Transaction log ensures atomic writes
# 2. Checkpoint tracks processed offsets
# 3. On failure, replay from last checkpoint

Backpressure Handling

# ============================================================
# BACKPRESSURE CONFIGURATION
# ============================================================

# Spark Structured Streaming backpressure
query = processed_df.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "s3://checkpoints/backpressure/") \
    .option("maxOffsetsPerTrigger", 100000)  # Limit batch size
    .option("maxRatePerPartition", 10000)    # Limit per partition
    .start("s3://lakehouse/processed/")

# Monitor processing rate
metrics = spark.read.json("s3://checkpoints/backpressure/offsets/")

# Dynamic adjustment based on processing time
def adjust_rate(current_rate, processing_time, target_time=1000):
    """Adjust processing rate based on processing time"""
    if processing_time > target_time:
        return current_rate * 0.8  # Reduce by 20%
    elif processing_time < target_time * 0.5:
        return current_rate * 1.2  # Increase by 20%
    return current_rate

Late Data Handling

# ============================================================
# LATE DATA HANDLING
# ============================================================

# With watermarking
watermarked_df = all_events \
    .withWatermark("timestamp", "10 minutes") \
    .groupBy(
        F.window("timestamp", "5 minutes"),
        "driver_id"
    ) \
    .agg(F.count("*").alias("ride_count"))

# Late data beyond watermark is dropped
# Allow late data within watermark period

# For Lambda architecture, late data is handled by batch layer
def handle_late_data(late_df, cutoff_date):
    """Process late-arriving data"""
    
    # Find records that arrived after their window
    late_records = late_df.filter(
        F.col("timestamp") < cutoff_date
    )
    
    # Update aggregations
    late_records.write.format("delta") \
        .mode("append") \
        .save("s3://lakehouse/late_data/")
    
    # Reprocess affected windows
    affected_windows = late_records.select(
        F.window("timestamp", "5 minutes").alias("window")
    ).distinct()
    
    return affected_windows

πŸ’‘

Production Tip: For Kappa architecture, use Kafka's log compaction to retain only the latest state for each key. This reduces storage costs while enabling replay. For Lambda, use Delta Lake's time travel for efficient reprocessing.


Common Follow-Up Questions

Q1: When would you choose Lambda over Kappa?

Choose Lambda when:

  • You have existing batch infrastructure
  • Complex analytics require full history
  • Team has batch processing expertise
  • Late-arriving data is common
  • Regulatory requirements need batch audit trails

Choose Kappa when:

  • Real-time is the primary requirement
  • Team has strong streaming expertise
  • You want simpler operations
  • Code duplication is a concern
  • All data can be retained in Kafka

Q2: How do you handle schema evolution in streaming?

# Schema evolution in Spark Structured Streaming
streaming_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "events") \
    .load()

# Use schema evolution with Delta Lake
query = streaming_df.writeStream \
    .format("delta") \
    .option("mergeSchema", "true")  # Allow schema evolution
    .start("s3://lakehouse/events/")

# For Avro-encoded Kafka topics
streaming_df = spark.readStream \
    .format("kafka") \
    .load() \
    .select(F.from_avro(F.col("value"), schema_string).alias("data"))

Q3: How do you monitor streaming pipelines?

# Monitoring metrics
def monitor_streaming_query(query):
    """Monitor streaming query health"""
    
    # Get query status
    status = query.status
    print(f"Query active: {status['isDataAvailable']}")
    print(f"Processing rate: {status['sources'][0]['numInputRows']}")
    
    # Get progress updates
    progress = query.lastProgress
    if progress:
        print(f"Batch duration: {progress['batchDuration']}ms")
        print(f"Input rows: {progress['numInputRows']}")
        print(f"Processed rows: {progress['processedRowsPerSecond']}")
    
    # Alert on issues
    if not status['isDataAvailable']:
        alert("Streaming query has no data available")
    
    if status['sources'][0]['numInputRows'] == 0:
        alert("Streaming query is not processing any rows")

Q4: How do you test streaming pipelines?

# Test streaming pipelines with test data
def test_streaming_pipeline():
    """Test streaming pipeline with controlled input"""
    
    # Create test data
    test_data = [
        ("ride_1", "driver_1", 40.7128, -74.0060, "2024-01-15 10:00:00"),
        ("ride_2", "driver_1", 40.7128, -74.0060, "2024-01-15 10:01:00"),
        ("ride_3", "driver_2", 34.0522, -118.2437, "2024-01-15 10:02:00"),
    ]
    
    # Write to Kafka
    for ride in test_data:
        producer.send("ride_events", value=json.dumps(ride).encode())
    
    # Wait for processing
    time.sleep(10)
    
    # Verify output
    result = spark.read.format("delta").load("s3://lakehouse/aggregates/5min/")
    assert result.count() > 0
    assert result.filter("driver_id = 'driver_1'").count() == 2

⚠️

Critical Consideration: Streaming pipelines are harder to debug than batch pipelines. Always implement: (1) comprehensive logging, (2) dead letter queues for failed events, (3) idempotent processing, and (4) monitoring/alerting. Test with production-like data volumes before deploying.


Company-Specific Tips

Uber Interview Tips

  • Discuss real-time location tracking architecture
  • Explain exactly-once for ride matching
  • Mention dynamic pricing real-time calculations
  • Talk about surge pricing streaming aggregations

LinkedIn Interview Tips

  • Focus on real-time recommendations
  • Discuss newsfeed streaming architecture
  • Mention connection suggestions real-time updates
  • Talk about spam detection in real-time

Netflix Interview Tips

  • Discuss content recommendation real-time updates
  • Explain viewing history streaming processing
  • Mention A/B testing real-time metrics
  • Talk about personalization streaming pipelines

ℹ️

Final Takeaway: Lambda architecture is more mature and fault-tolerant, but Kappa is simpler to maintain. For most new projects, Kappa is preferred unless you have specific requirements for batch reprocessing. The key is understanding your latency, throughput, and fault tolerance requirements before choosing.

Advertisement