Lambda Architecture for Data Processing
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β AWS LAMBDA ARCHITECTURE FOR DATA PROCESSING β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β EVENT SOURCES (Triggers) β β
β β β β
β β βββββββββββββββ βββββββββββββββ βββββββββββββββ βββββββββββββ β β
β β β S3 β β Kinesis β β DynamoDB β β CloudWatchβ β β
β β β (Put/Copy) β β (Records) β β (Streams) β β (Events) β β β
β β ββββββββ¬βββββββ ββββββββ¬βββββββ ββββββββ¬βββββββ βββββββ¬ββββββ β β
β β β β β β β β
β β ββββββββ΄βββββββ ββββββββ΄βββββββ ββββββββ΄βββββββ βββββββ΄ββββββ β β
β β β API Gateway β β EventBridgeβ β SQS/SNS β β Step Funcsβ β β
β β β (REST) β β (Events) β β (Messages) β β (Orch) β β β
β β ββββββββ¬βββββββ ββββββββ¬βββββββ ββββββββ¬βββββββ βββββββ¬ββββββ β β
β βββββββββββΌβββββββββββββββββΌβββββββββββββββββΌββββββββββββββββΌβββββββββ β
β β β β β β
β βΌ βΌ βΌ βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β LAMBDA FUNCTION β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Runtime Environment β β β
β β β β’ Python 3.11 / Node.js 20.x / Java 17 / Go 1.x β β β
β β β β’ 128 MB - 10 GB memory β β β
β β β β’ 0.5 vCPU - 6 vCPUs (proportional to memory) β β β
β β β β’ 128 MB deployment package β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Execution Model β β β
β β β β’ Event-driven (no idle) β β β
β β β β’ Auto-scaling (0 to 1000s concurrent) β β β
β β β β’ Stateless (no local storage) β β β
β β β β’ Timeout: 15 minutes max β β β
β β β β’ Cold start: 100ms - 10s β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββ¬ββββββββββββββββββββββββββββββββββββββββ β
β β β
β βββββββββββββββββββββΌββββββββββββββββββββ β
β βΌ βΌ βΌ β
β βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ β
β β Amazon S3 β β DynamoDB β β Other AWS β β
β β (Data Lake) β β (Metadata) β β Services β β
β βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Lambda Function Code Examples
S3 Trigger Data Processing
import json
import boto3
import pandas as pd
from io import BytesIO
from datetime import datetime
s3 = boto3.client('s3')
def lambda_handler(event, context):
"""Process files uploaded to S3 landing zone."""
# Process each record from S3 event
for record in event['Records']:
bucket = record['s3']['bucket']['name']
key = record['s3']['object']['key']
size = record['s3']['object']['size']
print(f"Processing s3://{bucket}/{key} ({size} bytes)")
# Download file from S3
response = s3.get_object(Bucket=bucket, Key=key)
file_content = response['Body'].read()
# Process based on file type
if key.endswith('.csv'):
df = pd.read_csv(BytesIO(file_content))
elif key.endswith('.json'):
df = pd.read_json(BytesIO(file_content))
elif key.endswith('.parquet'):
df = pd.read_parquet(BytesIO(file_content))
else:
raise ValueError(f"Unsupported file type: {key}")
# Transform data
df = transform_data(df)
# Write to processed zone
processed_key = key.replace('landing/', 'processed/')
output_buffer = BytesIO()
df.to_parquet(output_buffer, index=False, compression='snappy')
s3.put_object(
Bucket=bucket,
Key=processed_key,
Body=output_buffer.getvalue(),
Metadata={
'source': key,
'processed_at': datetime.now().isoformat(),
'row_count': str(len(df))
}
)
print(f"Processed {len(df)} rows β s3://{bucket}/{processed_key}")
return {
'statusCode': 200,
'body': json.dumps(f'Processed {len(event["Records"])} files')
}
def transform_data(df):
"""Apply data transformations."""
# Clean column names
df.columns = [col.lower().replace(' ', '_') for col in df.columns]
# Remove duplicates
df = df.drop_duplicates()
# Handle null values
df = df.fillna({
'amount': 0,
'status': 'unknown',
'created_at': datetime.now().isoformat()
})
# Add metadata columns
df['processed_at'] = datetime.now()
df['batch_id'] = f"batch_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
return df
Kinesis Stream Processing
import json
import base64
import boto3
from datetime import datetime
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('stream-processor-state')
def lambda_handler(event, context):
"""Process Kinesis stream records."""
processed_count = 0
failed_count = 0
for record in event['Records']:
try:
# Decode Kinesis data
payload = base64.b64decode(record['kinesis']['data'])
data = json.loads(payload)
# Process the record
result = process_record(data)
# Store result in DynamoDB
table.put_item(Item={
'event_id': data['event_id'],
'processed_at': datetime.now().isoformat(),
'result': result,
'ttl': int((datetime.now().timestamp()) + 86400 * 7) # 7 days
})
processed_count += 1
except Exception as e:
print(f"Error processing record: {str(e)}")
failed_count += 1
raise # Re-raise for retry
return {
'processed': processed_count,
'failed': failed_count
}
def process_record(data):
"""Process individual stream record."""
# Example: Validate and enrich data
return {
'event_type': data.get('event_type'),
'user_id': data.get('user_id'),
'amount': float(data.get('amount', 0)),
'status': 'processed',
'validation_score': calculate_validation_score(data)
}
def calculate_validation_score(data):
"""Calculate data quality score."""
score = 0
required_fields = ['event_id', 'event_type', 'user_id', 'amount']
for field in required_fields:
if field in data and data[field]:
score += 25
return score
DynamoDB Streams CDC
import json
from datetime import datetime
def lambda_handler(event, context):
"""Process DynamoDB stream records for CDC."""
for record in event['Records']:
event_name = record['eventName'] # INSERT, MODIFY, REMOVE
if event_name == 'INSERT':
handle_insert(record)
elif event_name == 'MODIFY':
handle_modify(record)
elif event_name == 'REMOVE':
handle_remove(record)
return {'statusCode': 200}
def handle_insert(record):
"""Handle new record insertion."""
new_image = record['dynamodb']['NewImage']
# Convert DynamoDB format to Python dict
item = deserialize_dynamodb(new_image)
print(f"New item inserted: {item['id']}")
# Trigger downstream processing
# e.g., send to Kinesis, update search index, etc.
def handle_modify(record):
"""Handle record modification."""
old_image = record['dynamodb'].get('OldImage', {})
new_image = record['dynamodb']['NewImage']
old_item = deserialize_dynamodb(old_image) if old_image else {}
new_item = deserialize_dynamodb(new_image)
# Detect specific field changes
changed_fields = detect_changes(old_item, new_item)
print(f"Item {new_item['id']} modified: {changed_fields}")
def handle_remove(record):
"""Handle record deletion."""
old_image = record['dynamodb']['OldImage']
item = deserialize_dynamodb(old_image)
print(f"Item deleted: {item['id']}")
def deserialize_dynamodb(image):
"""Convert DynamoDB format to Python dict."""
result = {}
for key, value in image.items():
if 'S' in value:
result[key] = value['S']
elif 'N' in value:
result[key] = float(value['N'])
elif 'BOOL' in value:
result[key] = value['BOOL']
elif 'NULL' in value:
result[key] = None
return result
def detect_changes(old, new):
"""Detect which fields changed."""
changes = []
for key in set(list(old.keys()) + list(new.keys())):
if old.get(key) != new.get(key):
changes.append(key)
return changes
Lambda Concurrency and Throttling
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β LAMBDA CONCURRENCY MODEL β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β ACCOUNT LEVEL β β
β β Default: 1,000 concurrent executions (soft limit) β β
β β Can be increased to 10,000+ via support β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β FUNCTION LEVEL β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Provisioned Concurrency β β β
β β β β’ Pre-warmed instances β β β
β β β β’ No cold starts β β β
β β β β’ Cost: $0.0000041667 per GB-second β β β
β β β β’ Use case: Latency-sensitive data processing β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Reserved Concurrency β β β
β β β β’ Guarantees max concurrent executions β β β
β β β β’ Prevents function from consuming account quota β β β
β β β β’ No additional cost β β β
β β β β’ Use case: Critical data pipelines β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β THROTTLING BEHAVIOR β β
β β β β
β β Sync Invocations: β β
β β β’ Throttled immediately β β
β β β’ Returns 429 error β β
β β β’ Client should retry with backoff β β
β β β β
β β Async Invocations: β β
β β β’ Retries automatically (2 times) β β
β β β’ Then routes to dead-letter queue (if configured) β β
β β β’ Or discards event (default) β β
β β β β
β β Event Source Mappings: β β
β β β’ Retries until success or event expires β β
β β β’ Backpressure from downstream β β
β β β’ Batch window controls throughput β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Lambda Limits and Best Practices
Lambda Limits
| Resource | Default Limit | Adjustable |
|---|---|---|
| Memory | 128 MB - 10 GB | No |
| Timeout | 900 seconds (15 min) | No |
| Package Size | 250 MB (unzipped) | No |
| Ephemeral Storage | 512 MB - 10 GB | Yes |
| Concurrent Executions | 1,000 | Yes (via support) |
| File Descriptors | 1,024 | No |
| Execution Environment | 128 MB | No |
Best Practices for Data Processing
βΉοΈ
Pro Tip: For large data files (>1GB), use Step Functions to orchestrate multiple Lambda invocations or use AWS Batch/EMR instead. Lambda has a 15-minute timeout limit.
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β LAMBDA BEST PRACTICES FOR DATA ENGINEERING β
β β
β 1. OPTIMIZE COLD STARTS β
β βββ Use Python/Node.js (faster cold starts) β
β βββ Minimize deployment package size β
β βββ Use Lambda Layers for dependencies β
β βββ Consider Provisioned Concurrency for critical paths β
β β
β 2. HANDLE RETRIES PROPERLY β
β βββ Implement idempotent handlers β
β βββ Use DynamoDB for deduplication β
β βββ Configure dead-letter queues β
β βββ Set maximum retry attempts β
β β
β 3. MANAGE CONNECTIONS β
β βββ Use connection pooling (RDS Proxy) β
β βββ Close database connections in finally block β
β βββ Use VPC Endpoints for AWS services β
β βββ Implement connection timeouts β
β β
β 4. MONITOR AND LOG β
β βββ Use CloudWatch Insights for log analysis β
β βββ Create custom metrics (put_metric_data) β
β βββ Set up alarms for throttles and errors β β
β βββ Use X-Ray for tracing β
β β
β 5. COST OPTIMIZATION β
β βββ Right-size memory (CPU scales with memory) β
β βββ Use Graviton2 (ARM) for 20% cost reduction β
β βββ Minimize execution time β
β βββ Use provisioned concurrency wisely β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Data Pipeline Patterns with Lambda
Pattern 1: Fan-Out/Fan-In
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β FAN-OUT/FAN-IN PATTERN β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β β
β β Event βββΊ Lambda (Orchestrator) β β
β β β β β
β β ββββββββββββΌβββββββββββ¬βββββββββββ¬βββββββββββ β β
β β βΌ βΌ βΌ βΌ βΌ β β
β β ββββββββββ ββββββββββ ββββββββββ ββββββββββ ββββββββββ β β
β β βWorker 1β βWorker 2β βWorker 3β βWorker 4β βWorker 5β β β
β β βLambda β βLambda β βLambda β βLambda β βLambda β β β
β β βββββ¬βββββ βββββ¬βββββ βββββ¬βββββ βββββ¬βββββ βββββ¬βββββ β β
β β β β β β β β β
β β βΌ βΌ βΌ βΌ βΌ β β
β β ββββββββββ ββββββββββ ββββββββββ ββββββββββ ββββββββββ β β
β β β S3 β β S3 β β S3 β β S3 β β S3 β β β
β β βPart 1 β βPart 2 β βPart 3 β βPart 4 β βPart 5 β β β
β β βββββ¬βββββ βββββ¬βββββ βββββ¬βββββ βββββ¬βββββ βββββ¬βββββ β β
β β β β β β β β β
β β ββββββββββββ΄βββββββββββ΄βββββββββββ΄βββββββββββ β β
β β β β β
β β βΌ β β
β β Lambda (Aggregator) β β
β β β β β
β β βΌ β β
β β ββββββββββββ β β
β β β Result β β β
β β β S3/DDB β β β
β β ββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Pattern 2: Streaming ETL
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β STREAMING ETL WITH LAMBDA β
β β
β Kinesis βββΊ Lambda βββΊ S3 (Raw) βββΊ Glue βββΊ S3 (Processed) β
β β
β Lambda Configuration: β
β β’ Batch Size: 100-1000 records β
β β’ Batch Window: 60 seconds β
β β’ Maximum Batching: Enabled β
β β’ Parallelization Factor: 10 β
β β
β Error Handling: β
β β’ On failure: Retry with exponential backoff β
β β’ After max retries: Send to DLQ β
β β’ DLQ β SQS β Lambda for reprocessing β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Interview Questions & Answers
Q1: When should you use Lambda vs. AWS Batch for data processing?
Answer:
- Lambda: Event-driven, <15 min duration, <10GB data, real-time
- AWS Batch: Long-running jobs, large datasets, custom environments
Lambda is ideal for stream processing, file transformations, and API integrations. Batch is better for ETL jobs that take hours or process terabytes.
Q2: How do you handle cold starts in Lambda?
Answer:
- Provisioned Concurrency: Pre-warm instances for critical paths
- Optimize Package: Minimize deployment size, use Lambda Layers
- Keep-Alive: Use scheduled events to keep functions warm
- Runtime Choice: Python/Node.js have faster cold starts than Java/.NET
- VPC: Avoid VPC if possible (adds 1-2s cold start)
Q3: What is the maximum batch size for Kinesis with Lambda?
Answer:
- Default: 100 records
- Maximum: 10,000 records
- Maximum Batch Window: 300 seconds (5 minutes)
- Best Practice: Start with 100-500, tune based on processing time
Q4: How do you implement idempotency in Lambda?
Answer:
- DynamoDB: Store processed event IDs with TTL
- S3: Use conditional writes (If-None-Match)
- Step Functions: Use unique execution IDs
- Custom Logic: Check if result already exists before processing
Q5: What are Lambda Layers and when should you use them?
Answer: Lambda Layers are ZIP archives containing libraries, custom runtimes, or other dependencies. Use them for:
- Shared libraries across functions
- Large dependencies (pandas, numpy)
- Custom runtimes
- Reducing deployment package size
Maximum layer size: 250 MB (unzipped).
Cost Considerations
| Component | Cost | Notes |
|---|---|---|
| Requests | $0.20 per 1M requests | First 1M free/month |
| Duration | $0.0000166667 per GB-second | First 400,000 GB-seconds free/month |
| Provisioned Concurrency | $0.0000041667 per GB-second | Plus requests |
| Data Transfer | $0.09/GB outbound | Standard S3 rates |
| EFS | $0.30/GB/month | If using EFS for large packages |
β οΈ
Cost Warning: Lambda costs can add up with high-volume data processing. Monitor invocation counts and duration. Consider Lambda Power Tuning to right-size memory allocation.
Summary
Lambda is essential for serverless data processing. Key takeaways:
- Triggers: S3, Kinesis, DynamoDB, EventBridge, SQS/SNS
- Limits: 15 min timeout, 10 GB memory, 250 MB package
- Concurrency: Provisioned for critical paths, reserved for quotas
- Best Practices: Idempotent handlers, connection pooling, dead-letter queues
- Patterns: Fan-out/fan-in, streaming ETL, CDC processing
- Cost: Pay-per-use, optimize memory and execution time