Serverless at Scale: Lambda, Step Functions, DynamoDB
Difficulty: Senior/Staff Level | Companies: Amazon, Netflix, Slack, Airbnb, Lyft
Interview Question
"Design a serverless data processing pipeline that handles 1 million events per second with exactly-once processing guarantees."
โน๏ธKey Concepts
This question tests your understanding of serverless patterns, event-driven architecture, and consistency models at massive scale.
Complete Serverless Architecture
Architecture Overview
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ SERVERLESS PROCESSING PIPELINE โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ โโโโโโโโโโโโโโโโโโ EVENT SOURCES โโโโโโโโโโโโโโโโโโโ โ
โ โ API Gateway โ IoT Core โ S3 Events โ Kinesis โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ โ
โ โโโโโโโโโโโโโโโโโโ INGESTION LAYER โโโโโโโโโโโโโโโโโโ โ
โ โ Kinesis Data Streams โ SQS โ EventBridge โ โ
โ โ Throughput: 1M+ events/sec โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ โ
โ โโโโโโโโโโโโโโโโโโ PROCESSING LAYER โโโโโโโโโโโโโโโโโ โ
โ โ โ โ
โ โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ โ
โ โ โ Step Functions State Machine โ โ โ
โ โ โ โ โ โ
โ โ โ โโโโโโโโโ โโโโโโโโโ โโโโโโโโโ โ โ โ
โ โ โ โLambda โโโโโถโLambda โโโโโถโLambda โ โ โ โ
โ โ โ โ Ingestโ โProcessโ โStore โ โ โ โ
โ โ โ โโโโโโโโโ โโโโโโโโโ โโโโโโโโโ โ โ โ
โ โ โ โ โ โ
โ โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ โ
โ โ โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ โ
โ โโโโโโโโโโโโโโโโโโ STORAGE LAYER โโโโโโโโโโโโโโโโโโโ โ
โ โ DynamoDB โ S3 โ ElastiCache โ Timestream โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Mathematical Foundation: Throughput Calculation
Lambda Concurrency Model:
- Concurrent executions needed: C = 1000
- Average duration: D = 100ms
- Requests per second: R = 1,000,000
- Concurrency per second: C_sec = R ร D / 1000 = 100,000
DynamoDB Capacity:
- Write capacity units: WCU = writes/sec ร item_size/1KB
- Read capacity units: RCU = reads/sec ร item_size/4KB
- For 1M events/sec with 1KB items: WCU = 1,000,000
Cost Estimation:
- Lambda cost: C_Lambda = requests ร duration ร price_per_gb_second
- DynamoDB cost: C_Dynamo = WCU ร price_per_wcu + RCU ร price_per_rcu
- Total cost: C_total = C_Lambda + C_Dynamo
AWS Lambda Implementation
# Lambda function for data processing
import json
import os
import boto3
from datetime import datetime
from typing import Dict, Any, List
from dataclasses import dataclass
import hashlib
@dataclass
class EventData:
event_id: str
event_type: str
timestamp: str
data: Dict[str, Any]
source: str
class LambdaProcessor:
"""Serverless data processor"""
def __init__(self):
self.dynamodb = boto3.resource('dynamodb')
self.table = self.dynamodb.Table(os.environ['TABLE_NAME'])
self.s3 = boto3.client('s3')
def handler(self, event: Dict[str, Any], context) -> Dict[str, Any]:
"""Main Lambda handler"""
try:
# Parse and validate event
event_data = self._parse_event(event)
# Process based on event type
result = self._process_event(event_data)
# Store result
self._store_result(result)
return {
'statusCode': 200,
'body': json.dumps({
'message': 'Event processed successfully',
'event_id': event_data.event_id
})
}
except Exception as e:
return {
'statusCode': 500,
'body': json.dumps({
'error': str(e)
})
}
def _parse_event(self, event: Dict[str, Any]) -> EventData:
"""Parse and validate incoming event"""
return EventData(
event_id=event.get('event_id', self._generate_event_id()),
event_type=event.get('event_type', 'unknown'),
timestamp=event.get('timestamp', datetime.utcnow().isoformat()),
data=event.get('data', {}),
source=event.get('source', 'api')
)
def _process_event(self, event_data: EventData) -> Dict[str, Any]:
"""Process event based on type"""
processors = {
'order_created': self._process_order,
'payment_received': self._process_payment,
'user_signup': self._process_signup
}
processor = processors.get(event_data.event_type, self._process_generic)
return processor(event_data)
def _process_order(self, event_data: EventData) -> Dict[str, Any]:
"""Process order event"""
order_data = event_data.data
# Validate order
if not self._validate_order(order_data):
raise ValueError("Invalid order data")
# Calculate totals
total = sum(
item['price'] * item['quantity']
for item in order_data['items']
)
return {
'event_id': event_data.event_id,
'type': 'order_processed',
'order_id': order_data.get('order_id'),
'total': total,
'status': 'confirmed',
'timestamp': event_data.timestamp
}
def _process_payment(self, event_data: EventData) -> Dict[str, Any]:
"""Process payment event"""
payment_data = event_data.data
return {
'event_id': event_data.event_id,
'type': 'payment_processed',
'payment_id': payment_data.get('payment_id'),
'amount': payment_data.get('amount'),
'status': 'completed',
'timestamp': event_data.timestamp
}
def _process_generic(self, event_data: EventData) -> Dict[str, Any]:
"""Generic event processor"""
return {
'event_id': event_data.event_id,
'type': 'generic_processed',
'data': event_data.data,
'timestamp': event_data.timestamp
}
def _validate_order(self, order_data: Dict) -> bool:
"""Validate order data"""
required_fields = ['order_id', 'user_id', 'items']
return all(field in order_data for field in required_fields)
def _store_result(self, result: Dict[str, Any]):
"""Store result in DynamoDB"""
self.table.put_item(
Item={
'PK': f"EVENT#{result['event_id']}",
'SK': f"RESULT#{datetime.utcnow().isoformat()}",
'Data': result,
'TTL': int((datetime.utcnow().timestamp()) + 86400)
}
)
def _generate_event_id(self) -> str:
"""Generate unique event ID"""
return hashlib.md5(
f"{datetime.utcnow().isoformat()}{os.urandom(16)}".encode()
).hexdigest()
# Lambda handler instance
processor = LambdaProcessor()
handler = processor.handler
Step Functions State Machine
# Step Functions state machine for complex workflows
resource "aws_sfn_state_machine" "data_pipeline" {
name = "data-processing-pipeline"
role_arn = aws_iam_role.step_functions.arn
definition = jsonencode({
Comment = "Data processing pipeline with error handling"
StartAt = "IngestData"
States = {
IngestData = {
Type = "Task"
Resource = aws_lambda_function.ingest.arn
Retry = [
{
ErrorEquals = ["States.TaskFailed"]
IntervalSeconds = 2
MaxAttempts = 3
BackoffRate = 2
}
]
Catch = [
{
ErrorEquals = ["States.ALL"]
Next = "HandleError"
}
]
Next = "ValidateData"
}
ValidateData = {
Type = "Task"
Resource = aws_lambda_function.validate.arn
Next = "ChoiceState"
}
ChoiceState = {
Type = "Choice"
Choices = [
{
Variable = "$.isValid"
BooleanEquals = true
Next = "ProcessData"
}
]
Default = "HandleValidationError"
}
ProcessData = {
Type = "Parallel"
Branches = [
{
StartAt = "ProcessOrder"
States = {
ProcessOrder = {
Type = "Task"
Resource = aws_lambda_function.process_order.arn
End = true
}
}
},
{
StartAt = "ProcessInventory"
States = {
ProcessInventory = {
Type = "Task"
Resource = aws_lambda_function.process_inventory.arn
End = true
}
}
},
{
StartAt = "ProcessPayment"
States = {
ProcessPayment = {
Type = "Task"
Resource = aws_lambda_function.process_payment.arn
End = true
}
}
}
]
Next = "AggregateResults"
}
AggregateResults = {
Type = "Task"
Resource = aws_lambda_function.aggregate.arn
Next = "StoreResults"
}
StoreResults = {
Type = "Task"
Resource = aws_lambda_function.store.arn
Next = "SendNotification"
}
SendNotification = {
Type = "Task"
Resource = aws_lambda_function.notify.arn
End = true
}
HandleError = {
Type = "Task"
Resource = aws_lambda_function.handle_error.arn
Next = "FailState"
}
HandleValidationError = {
Type = "Task"
Resource = aws_lambda_function.handle_validation_error.arn
Next = "FailState"
}
FailState = {
Type = "Fail"
Cause = "Pipeline failed"
Error = "PipelineError"
}
}
})
}
# Lambda function for Step Functions
resource "aws_lambda_function" "process_order" {
filename = "lambda/process_order.zip"
function_name = "process-order"
role = aws_iam_role.lambda.arn
handler = "index.handler"
runtime = "python3.9"
timeout = 30
memory_size = 256
environment {
variables = {
TABLE_NAME = aws_dynamodb_table.processed_data.name
QUEUE_URL = aws_sqs_queue.processing_queue.url
}
}
tracing_config {
mode = "Active"
}
}
# Step Functions Lambda handlers
import json
import boto3
from typing import Dict, Any
dynamodb = boto3.resource('dynamodb')
def ingest_handler(event: Dict[str, Any], context) -> Dict[str, Any]:
"""Ingest data from various sources"""
source = event.get('source', 'unknown')
data = event.get('data', {})
return {
'statusCode': 200,
'source': source,
'raw_data': data,
'ingestion_timestamp': context.get_remaining_time_in_millis()
}
def validate_handler(event: Dict[str, Any], context) -> Dict[str, Any]:
"""Validate incoming data"""
required_fields = ['id', 'type', 'timestamp']
validation_errors = []
for field in required_fields:
if field not in event.get('raw_data', {}):
validation_errors.append(f"Missing required field: {field}")
return {
'isValid': len(validation_errors) == 0,
'errors': validation_errors,
'raw_data': event.get('raw_data', {})
}
def process_order_handler(event: Dict[str, Any], context) -> Dict[str, Any]:
"""Process order in parallel branch"""
order_data = event.get('raw_data', {})
# Process order logic
processed_order = {
'order_id': order_data.get('order_id'),
'status': 'processed',
'total': sum(
item.get('price', 0) * item.get('quantity', 0)
for item in order_data.get('items', [])
)
}
return processed_order
def aggregate_handler(event: Dict[str, Any], context) -> Dict[str, Any]:
"""Aggregate results from parallel branches"""
# In Step Functions, parallel results are in 'Results'
results = event.get('Results', [])
aggregated = {
'order': results[0] if len(results) > 0 else None,
'inventory': results[1] if len(results) > 1 else None,
'payment': results[2] if len(results) > 2 else None,
'processing_time': context.get_remaining_time_in_millis()
}
return aggregated
def store_handler(event: Dict[str, Any], context) -> Dict[str, Any]:
"""Store processed data in DynamoDB"""
table = dynamodb.Table('processed-data')
table.put_item(
Item={
'PK': f"ORDER#{event.get('order', {}).get('order_id')}",
'SK': 'PROCESSED',
'Data': event,
'TTL': int((context.get_remaining_time_in_millis() / 1000) + 86400)
}
)
return {
'stored': True,
'order_id': event.get('order', {}).get('order_id')
}
def notify_handler(event: Dict[str, Any], context) -> Dict[str, Any]:
"""Send notification about processing completion"""
sns = boto3.client('sns')
message = {
'orderId': event.get('order', {}).get('order_id'),
'status': 'completed',
'processingTime': context.get_remaining_time_in_millis()
}
sns.publish(
TopicArn='arn:aws:sns:us-east-1:123456789012:processing-notifications',
Message=json.dumps(message),
Subject='Order Processing Complete'
)
return {'notified': True}
DynamoDB Design Patterns
# DynamoDB single-table design
import boto3
from boto3.dynamodb.conditions import Key, Attr
from typing import Dict, Any, List, Optional
from datetime import datetime, timedelta
import json
class DynamoDBRepository:
"""DynamoDB single-table design implementation"""
def __init__(self, table_name: str):
self.dynamodb = boto3.resource('dynamodb')
self.table = self.dynamodb.Table(table_name)
def create_order(self, order_data: Dict[str, Any]) -> Dict[str, Any]:
"""Create order with single-table design"""
order_id = order_data['order_id']
user_id = order_data['user_id']
created_at = datetime.utcnow().isoformat()
# Write to table with multiple access patterns
with self.table.batch_writer() as batch:
# Order item
batch.put_item(
Item={
'PK': f"ORDER#{order_id}",
'SK': f"ORDER#{order_id}",
'GSI1PK': f"USER#{user_id}",
'GSI1SK': f"ORDER#{created_at}",
'GSI2PK': f"STATUS#PENDING",
'GSI2SK': f"ORDER#{created_at}",
'DataType': 'Order',
'Data': order_data,
'TTL': int((datetime.utcnow() + timedelta(days=30)).timestamp())
}
)
# User order reference
batch.put_item(
Item={
'PK': f"USER#{user_id}",
'SK': f"ORDER#{order_id}",
'DataType': 'UserOrder',
'GSI1PK': f"USER#{user_id}",
'GSI1SK': f"ORDER#{created_at}",
'TTL': int((datetime.utcnow() + timedelta(days=30)).timestamp())
}
)
# Order items
for item in order_data.get('items', []):
batch.put_item(
Item={
'PK': f"ORDER#{order_id}",
'SK': f"ITEM#{item['product_id']}",
'DataType': 'OrderItem',
'Data': item,
'TTL': int((datetime.utcnow() + timedelta(days=30)).timestamp())
}
)
return {'orderId': order_id, 'createdAt': created_at}
def get_order(self, order_id: str) -> Optional[Dict[str, Any]]:
"""Get order by ID"""
response = self.table.get_item(
Key={
'PK': f"ORDER#{order_id}",
'SK': f"ORDER#{order_id}"
}
)
return response.get('Item')
def get_user_orders(self, user_id: str, limit: int = 10) -> List[Dict[str, Any]]:
"""Get orders for a user using GSI1"""
response = self.table.query(
IndexName='GSI1',
KeyConditionExpression=Key('GSI1PK').eq(f"USER#{user_id}") &
Key('GSI1SK').begins_with('ORDER#'),
ScanIndexForward=False,
Limit=limit
)
return response.get('Items', [])
def get_pending_orders(self, limit: int = 10) -> List[Dict[str, Any]]:
"""Get pending orders using GSI2"""
response = self.table.query(
IndexName='GSI2',
KeyConditionExpression=Key('GSI2PK').eq("STATUS#PENDING") &
Key('GSI2SK').begins_with('ORDER#'),
ScanIndexForward=False,
Limit=limit
)
return response.get('Items', [])
def update_order_status(self, order_id: str, new_status: str) -> bool:
"""Update order status"""
try:
self.table.update_item(
Key={
'PK': f"ORDER#{order_id}",
'SK': f"ORDER#{order_id}"
},
UpdateExpression="SET GSI2PK = :status",
ExpressionAttributeValues={
':status': f"STATUS#{new_status}"
}
)
return True
except Exception as e:
print(f"Error updating order: {e}")
return False
# DynamoDB capacity calculation
# For 1M writes/sec with 1KB items:
# WCU = 1,000,000 (each write is 1 WCU)
# For 1M reads/sec with 1KB items:
# RCU = 250,000 (each read is 0.25 RCU for eventually consistent reads)
# Cost estimation:
# WCU cost = 1,000,000 ร $0.00013 = $130/hour
# RCU cost = 250,000 ร $0.00013 = $32.50/hour
# Total hourly cost = $162.50
โ ๏ธDynamoDB Design
Single-table design is essential for DynamoDB at scale. Use composite primary keys and GSIs to support multiple access patterns efficiently.
EventBridge Integration
# EventBridge rule for event routing
import boto3
import json
from datetime import datetime
class EventBridgePublisher:
"""EventBridge event publisher"""
def __init__(self, event_bus_name: str = 'default'):
self.eventbridge = boto3.client('events')
self.event_bus_name = event_bus_name
def publish_event(self, detail_type: str, detail: dict, source: str = 'custom.app'):
"""Publish event to EventBridge"""
response = self.eventbridge.put_events(
Entries=[
{
'Source': source,
'DetailType': detail_type,
'Detail': json.dumps(detail),
'EventBusName': self.event_bus_name,
'Time': datetime.utcnow(),
'Resources': []
}
]
)
return response
def publish_order_event(self, order_data: dict):
"""Publish order-related event"""
self.publish_event(
detail_type='OrderCreated',
detail=order_data,
source='custom.orders'
)
def publish_payment_event(self, payment_data: dict):
"""Publish payment-related event"""
self.publish_event(
detail_type='PaymentProcessed',
detail=payment_data,
source='custom.payments'
)
# EventBridge rule for routing
"""
{
"source": ["custom.orders"],
"detail-type": ["OrderCreated"],
"detail": {
"status": ["confirmed"]
}
}
"""
# Step Functions integration with EventBridge
"""
{
"StartAt": "WaitForOrderEvent",
"States": {
"WaitForOrderEvent": {
"Type": "Wait",
"EventBridgeEventPattern": {
"source": ["custom.orders"],
"detail-type": ["OrderCreated"]
},
"Next": "ProcessOrder"
}
}
}
"""
Kinesis Data Streams
# Kinesis producer and consumer
import boto3
import json
from typing import List, Dict, Any
from datetime import datetime
import hashlib
class KinesisProducer:
"""Kinesis data stream producer"""
def __init__(self, stream_name: str):
self.kinesis = boto3.client('kinesis')
self.stream_name = stream_name
def put_record(self, data: Dict[str, Any], partition_key: str = None):
"""Put single record"""
if partition_key is None:
partition_key = hashlib.md5(
json.dumps(data).encode()
).hexdigest()
response = self.kinesis.put_record(
StreamName=self.stream_name,
Data=json.dumps(data),
PartitionKey=partition_key
)
return response
def put_records(self, records: List[Dict[str, Any]]):
"""Put multiple records"""
kinesis_records = [
{
'Data': json.dumps(record),
'PartitionKey': hashlib.md5(
json.dumps(record).encode()
).hexdigest()
}
for record in records
]
# Kinesis batch limit is 500
batch_size = 500
responses = []
for i in range(0, len(kinesis_records), batch_size):
batch = kinesis_records[i:i + batch_size]
response = self.kinesis.put_records(
StreamName=self.stream_name,
Records=batch
)
responses.append(response)
return responses
class KinesisConsumer:
"""Kinesis data stream consumer with enhanced fan-out"""
def __init__(self, stream_name: str, consumer_name: str):
self.kinesis = boto3.client('kinesis')
self.stream_name = stream_name
self.consumer_name = consumer_name
def get_shard_iterator(self, shard_id: str, iterator_type: str = 'LATEST'):
"""Get shard iterator"""
response = self.kinesis.get_shard_iterator(
StreamName=self.stream_name,
ShardId=shard_id,
ShardIteratorType=iterator_type
)
return response['ShardIterator']
def get_records(self, shard_iterator: str, limit: int = 100) -> List[Dict[str, Any]]:
"""Get records from shard"""
response = self.kinesis.get_records(
ShardIterator=shard_iterator,
Limit=limit
)
return response['Records']
def process_stream(self):
"""Process entire stream"""
# Get shard list
stream_description = self.kinesis.describe_stream(
StreamName=self.stream_name
)
shards = stream_description['StreamDescription']['Shards']
for shard in shards:
shard_id = shard['ShardId']
shard_iterator = self.get_shard_iterator(shard_id)
while shard_iterator:
records = self.get_records(shard_iterator)
for record in records:
data = json.loads(record['Data'])
self.process_record(data)
# Get next shard iterator
if records:
shard_iterator = records[-1].get('NextShardIterator')
else:
break
def process_record(self, record: Dict[str, Any]):
"""Process individual record"""
print(f"Processing record: {record}")
โ Serverless Best Practices
Use provisioned concurrency for critical functions, implement dead letter queues, and monitor cold start metrics. Consider Lambda layers for shared dependencies.
Summary
| Component | Purpose | Configuration |
|---|---|---|
| Lambda | Serverless compute | Memory, timeout, concurrency |
| Step Functions | Workflow orchestration | State machines, parallel execution |
| DynamoDB | NoSQL database | Single-table design, GSIs |
| EventBridge | Event routing | Rules, patterns, targets |
| Kinesis | Stream processing | Shards, consumers, producers |