๐ŸŽ‰ 75% of content is free forever โ€” Unlock Premium from $10/mo โ†’
CW
Search coursesโ€ฆ
๐Ÿ’ผ Servicesโ„น๏ธ Aboutโœ‰๏ธ ContactView Pricing Plansfrom $10

Serverless at Scale: Lambda, Step Functions, DynamoDB

Cloud ArchitectureServerless Architectureโญ Premium

Advertisement

Serverless at Scale: Lambda, Step Functions, DynamoDB

Difficulty: Senior/Staff Level | Companies: Amazon, Netflix, Slack, Airbnb, Lyft

Interview Question

"Design a serverless data processing pipeline that handles 1 million events per second with exactly-once processing guarantees."

โ„น๏ธKey Concepts

This question tests your understanding of serverless patterns, event-driven architecture, and consistency models at massive scale.

Complete Serverless Architecture

Architecture Overview

Architecture Diagram
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                    SERVERLESS PROCESSING PIPELINE                        โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚                                                                          โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ EVENT SOURCES โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”                  โ”‚
โ”‚  โ”‚  API Gateway โ”‚ IoT Core โ”‚ S3 Events โ”‚ Kinesis    โ”‚                  โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜                  โ”‚
โ”‚                         โ”‚                                               โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ INGESTION LAYER โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”                โ”‚
โ”‚  โ”‚  Kinesis Data Streams โ”‚ SQS โ”‚ EventBridge        โ”‚                  โ”‚
โ”‚  โ”‚  Throughput: 1M+ events/sec                        โ”‚                  โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜                  โ”‚
โ”‚                         โ”‚                                               โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ PROCESSING LAYER โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”                โ”‚
โ”‚  โ”‚                                                       โ”‚              โ”‚
โ”‚  โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”‚              โ”‚
โ”‚  โ”‚  โ”‚         Step Functions State Machine         โ”‚    โ”‚              โ”‚
โ”‚  โ”‚  โ”‚                                               โ”‚    โ”‚              โ”‚
โ”‚  โ”‚  โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”        โ”‚    โ”‚              โ”‚
โ”‚  โ”‚  โ”‚  โ”‚Lambda โ”‚โ”€โ”€โ”€โ–ถโ”‚Lambda โ”‚โ”€โ”€โ”€โ–ถโ”‚Lambda โ”‚        โ”‚    โ”‚              โ”‚
โ”‚  โ”‚  โ”‚  โ”‚ Ingestโ”‚    โ”‚Processโ”‚    โ”‚Store  โ”‚        โ”‚    โ”‚              โ”‚
โ”‚  โ”‚  โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜    โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜    โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜        โ”‚    โ”‚              โ”‚
โ”‚  โ”‚  โ”‚                                               โ”‚    โ”‚              โ”‚
โ”‚  โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜    โ”‚              โ”‚
โ”‚  โ”‚                                                       โ”‚              โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜                  โ”‚
โ”‚                         โ”‚                                               โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ STORAGE LAYER โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”                 โ”‚
โ”‚  โ”‚  DynamoDB โ”‚ S3 โ”‚ ElastiCache โ”‚ Timestream       โ”‚                  โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜              โ”‚
โ”‚                                                                          โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Mathematical Foundation: Throughput Calculation

Lambda Concurrency Model:

  • Concurrent executions needed: C = 1000
  • Average duration: D = 100ms
  • Requests per second: R = 1,000,000
  • Concurrency per second: C_sec = R ร— D / 1000 = 100,000

DynamoDB Capacity:

  • Write capacity units: WCU = writes/sec ร— item_size/1KB
  • Read capacity units: RCU = reads/sec ร— item_size/4KB
  • For 1M events/sec with 1KB items: WCU = 1,000,000

Cost Estimation:

  • Lambda cost: C_Lambda = requests ร— duration ร— price_per_gb_second
  • DynamoDB cost: C_Dynamo = WCU ร— price_per_wcu + RCU ร— price_per_rcu
  • Total cost: C_total = C_Lambda + C_Dynamo

AWS Lambda Implementation

# Lambda function for data processing
import json
import os
import boto3
from datetime import datetime
from typing import Dict, Any, List
from dataclasses import dataclass
import hashlib

@dataclass
class EventData:
    event_id: str
    event_type: str
    timestamp: str
    data: Dict[str, Any]
    source: str

class LambdaProcessor:
    """Serverless data processor"""

    def __init__(self):
        self.dynamodb = boto3.resource('dynamodb')
        self.table = self.dynamodb.Table(os.environ['TABLE_NAME'])
        self.s3 = boto3.client('s3')

    def handler(self, event: Dict[str, Any], context) -> Dict[str, Any]:
        """Main Lambda handler"""
        try:
            # Parse and validate event
            event_data = self._parse_event(event)

            # Process based on event type
            result = self._process_event(event_data)

            # Store result
            self._store_result(result)

            return {
                'statusCode': 200,
                'body': json.dumps({
                    'message': 'Event processed successfully',
                    'event_id': event_data.event_id
                })
            }

        except Exception as e:
            return {
                'statusCode': 500,
                'body': json.dumps({
                    'error': str(e)
                })
            }

    def _parse_event(self, event: Dict[str, Any]) -> EventData:
        """Parse and validate incoming event"""
        return EventData(
            event_id=event.get('event_id', self._generate_event_id()),
            event_type=event.get('event_type', 'unknown'),
            timestamp=event.get('timestamp', datetime.utcnow().isoformat()),
            data=event.get('data', {}),
            source=event.get('source', 'api')
        )

    def _process_event(self, event_data: EventData) -> Dict[str, Any]:
        """Process event based on type"""
        processors = {
            'order_created': self._process_order,
            'payment_received': self._process_payment,
            'user_signup': self._process_signup
        }

        processor = processors.get(event_data.event_type, self._process_generic)
        return processor(event_data)

    def _process_order(self, event_data: EventData) -> Dict[str, Any]:
        """Process order event"""
        order_data = event_data.data

        # Validate order
        if not self._validate_order(order_data):
            raise ValueError("Invalid order data")

        # Calculate totals
        total = sum(
            item['price'] * item['quantity']
            for item in order_data['items']
        )

        return {
            'event_id': event_data.event_id,
            'type': 'order_processed',
            'order_id': order_data.get('order_id'),
            'total': total,
            'status': 'confirmed',
            'timestamp': event_data.timestamp
        }

    def _process_payment(self, event_data: EventData) -> Dict[str, Any]:
        """Process payment event"""
        payment_data = event_data.data

        return {
            'event_id': event_data.event_id,
            'type': 'payment_processed',
            'payment_id': payment_data.get('payment_id'),
            'amount': payment_data.get('amount'),
            'status': 'completed',
            'timestamp': event_data.timestamp
        }

    def _process_generic(self, event_data: EventData) -> Dict[str, Any]:
        """Generic event processor"""
        return {
            'event_id': event_data.event_id,
            'type': 'generic_processed',
            'data': event_data.data,
            'timestamp': event_data.timestamp
        }

    def _validate_order(self, order_data: Dict) -> bool:
        """Validate order data"""
        required_fields = ['order_id', 'user_id', 'items']
        return all(field in order_data for field in required_fields)

    def _store_result(self, result: Dict[str, Any]):
        """Store result in DynamoDB"""
        self.table.put_item(
            Item={
                'PK': f"EVENT#{result['event_id']}",
                'SK': f"RESULT#{datetime.utcnow().isoformat()}",
                'Data': result,
                'TTL': int((datetime.utcnow().timestamp()) + 86400)
            }
        )

    def _generate_event_id(self) -> str:
        """Generate unique event ID"""
        return hashlib.md5(
            f"{datetime.utcnow().isoformat()}{os.urandom(16)}".encode()
        ).hexdigest()

# Lambda handler instance
processor = LambdaProcessor()
handler = processor.handler

Step Functions State Machine

# Step Functions state machine for complex workflows
resource "aws_sfn_state_machine" "data_pipeline" {
  name     = "data-processing-pipeline"
  role_arn = aws_iam_role.step_functions.arn

  definition = jsonencode({
    Comment = "Data processing pipeline with error handling"
    StartAt = "IngestData"
    States = {
      IngestData = {
        Type = "Task"
        Resource = aws_lambda_function.ingest.arn
        Retry = [
          {
            ErrorEquals = ["States.TaskFailed"]
            IntervalSeconds = 2
            MaxAttempts = 3
            BackoffRate = 2
          }
        ]
        Catch = [
          {
            ErrorEquals = ["States.ALL"]
            Next = "HandleError"
          }
        ]
        Next = "ValidateData"
      }

      ValidateData = {
        Type = "Task"
        Resource = aws_lambda_function.validate.arn
        Next = "ChoiceState"
      }

      ChoiceState = {
        Type = "Choice"
        Choices = [
          {
            Variable = "$.isValid"
            BooleanEquals = true
            Next = "ProcessData"
          }
        ]
        Default = "HandleValidationError"
      }

      ProcessData = {
        Type = "Parallel"
        Branches = [
          {
            StartAt = "ProcessOrder"
            States = {
              ProcessOrder = {
                Type = "Task"
                Resource = aws_lambda_function.process_order.arn
                End = true
              }
            }
          },
          {
            StartAt = "ProcessInventory"
            States = {
              ProcessInventory = {
                Type = "Task"
                Resource = aws_lambda_function.process_inventory.arn
                End = true
              }
            }
          },
          {
            StartAt = "ProcessPayment"
            States = {
              ProcessPayment = {
                Type = "Task"
                Resource = aws_lambda_function.process_payment.arn
                End = true
              }
            }
          }
        ]
        Next = "AggregateResults"
      }

      AggregateResults = {
        Type = "Task"
        Resource = aws_lambda_function.aggregate.arn
        Next = "StoreResults"
      }

      StoreResults = {
        Type = "Task"
        Resource = aws_lambda_function.store.arn
        Next = "SendNotification"
      }

      SendNotification = {
        Type = "Task"
        Resource = aws_lambda_function.notify.arn
        End = true
      }

      HandleError = {
        Type = "Task"
        Resource = aws_lambda_function.handle_error.arn
        Next = "FailState"
      }

      HandleValidationError = {
        Type = "Task"
        Resource = aws_lambda_function.handle_validation_error.arn
        Next = "FailState"
      }

      FailState = {
        Type = "Fail"
        Cause = "Pipeline failed"
        Error = "PipelineError"
      }
    }
  })
}

# Lambda function for Step Functions
resource "aws_lambda_function" "process_order" {
  filename         = "lambda/process_order.zip"
  function_name    = "process-order"
  role            = aws_iam_role.lambda.arn
  handler         = "index.handler"
  runtime         = "python3.9"
  timeout         = 30
  memory_size     = 256

  environment {
    variables = {
      TABLE_NAME = aws_dynamodb_table.processed_data.name
      QUEUE_URL  = aws_sqs_queue.processing_queue.url
    }
  }

  tracing_config {
    mode = "Active"
  }
}
# Step Functions Lambda handlers
import json
import boto3
from typing import Dict, Any

dynamodb = boto3.resource('dynamodb')

def ingest_handler(event: Dict[str, Any], context) -> Dict[str, Any]:
    """Ingest data from various sources"""
    source = event.get('source', 'unknown')
    data = event.get('data', {})

    return {
        'statusCode': 200,
        'source': source,
        'raw_data': data,
        'ingestion_timestamp': context.get_remaining_time_in_millis()
    }

def validate_handler(event: Dict[str, Any], context) -> Dict[str, Any]:
    """Validate incoming data"""
    required_fields = ['id', 'type', 'timestamp']

    validation_errors = []
    for field in required_fields:
        if field not in event.get('raw_data', {}):
            validation_errors.append(f"Missing required field: {field}")

    return {
        'isValid': len(validation_errors) == 0,
        'errors': validation_errors,
        'raw_data': event.get('raw_data', {})
    }

def process_order_handler(event: Dict[str, Any], context) -> Dict[str, Any]:
    """Process order in parallel branch"""
    order_data = event.get('raw_data', {})

    # Process order logic
    processed_order = {
        'order_id': order_data.get('order_id'),
        'status': 'processed',
        'total': sum(
            item.get('price', 0) * item.get('quantity', 0)
            for item in order_data.get('items', [])
        )
    }

    return processed_order

def aggregate_handler(event: Dict[str, Any], context) -> Dict[str, Any]:
    """Aggregate results from parallel branches"""
    # In Step Functions, parallel results are in 'Results'
    results = event.get('Results', [])

    aggregated = {
        'order': results[0] if len(results) > 0 else None,
        'inventory': results[1] if len(results) > 1 else None,
        'payment': results[2] if len(results) > 2 else None,
        'processing_time': context.get_remaining_time_in_millis()
    }

    return aggregated

def store_handler(event: Dict[str, Any], context) -> Dict[str, Any]:
    """Store processed data in DynamoDB"""
    table = dynamodb.Table('processed-data')

    table.put_item(
        Item={
            'PK': f"ORDER#{event.get('order', {}).get('order_id')}",
            'SK': 'PROCESSED',
            'Data': event,
            'TTL': int((context.get_remaining_time_in_millis() / 1000) + 86400)
        }
    )

    return {
        'stored': True,
        'order_id': event.get('order', {}).get('order_id')
    }

def notify_handler(event: Dict[str, Any], context) -> Dict[str, Any]:
    """Send notification about processing completion"""
    sns = boto3.client('sns')

    message = {
        'orderId': event.get('order', {}).get('order_id'),
        'status': 'completed',
        'processingTime': context.get_remaining_time_in_millis()
    }

    sns.publish(
        TopicArn='arn:aws:sns:us-east-1:123456789012:processing-notifications',
        Message=json.dumps(message),
        Subject='Order Processing Complete'
    )

    return {'notified': True}

DynamoDB Design Patterns

# DynamoDB single-table design
import boto3
from boto3.dynamodb.conditions import Key, Attr
from typing import Dict, Any, List, Optional
from datetime import datetime, timedelta
import json

class DynamoDBRepository:
    """DynamoDB single-table design implementation"""

    def __init__(self, table_name: str):
        self.dynamodb = boto3.resource('dynamodb')
        self.table = self.dynamodb.Table(table_name)

    def create_order(self, order_data: Dict[str, Any]) -> Dict[str, Any]:
        """Create order with single-table design"""
        order_id = order_data['order_id']
        user_id = order_data['user_id']
        created_at = datetime.utcnow().isoformat()

        # Write to table with multiple access patterns
        with self.table.batch_writer() as batch:
            # Order item
            batch.put_item(
                Item={
                    'PK': f"ORDER#{order_id}",
                    'SK': f"ORDER#{order_id}",
                    'GSI1PK': f"USER#{user_id}",
                    'GSI1SK': f"ORDER#{created_at}",
                    'GSI2PK': f"STATUS#PENDING",
                    'GSI2SK': f"ORDER#{created_at}",
                    'DataType': 'Order',
                    'Data': order_data,
                    'TTL': int((datetime.utcnow() + timedelta(days=30)).timestamp())
                }
            )

            # User order reference
            batch.put_item(
                Item={
                    'PK': f"USER#{user_id}",
                    'SK': f"ORDER#{order_id}",
                    'DataType': 'UserOrder',
                    'GSI1PK': f"USER#{user_id}",
                    'GSI1SK': f"ORDER#{created_at}",
                    'TTL': int((datetime.utcnow() + timedelta(days=30)).timestamp())
                }
            )

            # Order items
            for item in order_data.get('items', []):
                batch.put_item(
                    Item={
                        'PK': f"ORDER#{order_id}",
                        'SK': f"ITEM#{item['product_id']}",
                        'DataType': 'OrderItem',
                        'Data': item,
                        'TTL': int((datetime.utcnow() + timedelta(days=30)).timestamp())
                    }
                )

        return {'orderId': order_id, 'createdAt': created_at}

    def get_order(self, order_id: str) -> Optional[Dict[str, Any]]:
        """Get order by ID"""
        response = self.table.get_item(
            Key={
                'PK': f"ORDER#{order_id}",
                'SK': f"ORDER#{order_id}"
            }
        )
        return response.get('Item')

    def get_user_orders(self, user_id: str, limit: int = 10) -> List[Dict[str, Any]]:
        """Get orders for a user using GSI1"""
        response = self.table.query(
            IndexName='GSI1',
            KeyConditionExpression=Key('GSI1PK').eq(f"USER#{user_id}") &
                                   Key('GSI1SK').begins_with('ORDER#'),
            ScanIndexForward=False,
            Limit=limit
        )
        return response.get('Items', [])

    def get_pending_orders(self, limit: int = 10) -> List[Dict[str, Any]]:
        """Get pending orders using GSI2"""
        response = self.table.query(
            IndexName='GSI2',
            KeyConditionExpression=Key('GSI2PK').eq("STATUS#PENDING") &
                                   Key('GSI2SK').begins_with('ORDER#'),
            ScanIndexForward=False,
            Limit=limit
        )
        return response.get('Items', [])

    def update_order_status(self, order_id: str, new_status: str) -> bool:
        """Update order status"""
        try:
            self.table.update_item(
                Key={
                    'PK': f"ORDER#{order_id}",
                    'SK': f"ORDER#{order_id}"
                },
                UpdateExpression="SET GSI2PK = :status",
                ExpressionAttributeValues={
                    ':status': f"STATUS#{new_status}"
                }
            )
            return True
        except Exception as e:
            print(f"Error updating order: {e}")
            return False

# DynamoDB capacity calculation
# For 1M writes/sec with 1KB items:
# WCU = 1,000,000 (each write is 1 WCU)
# For 1M reads/sec with 1KB items:
# RCU = 250,000 (each read is 0.25 RCU for eventually consistent reads)

# Cost estimation:
# WCU cost = 1,000,000 ร— $0.00013 = $130/hour
# RCU cost = 250,000 ร— $0.00013 = $32.50/hour
# Total hourly cost = $162.50

โš ๏ธDynamoDB Design

Single-table design is essential for DynamoDB at scale. Use composite primary keys and GSIs to support multiple access patterns efficiently.

EventBridge Integration

# EventBridge rule for event routing
import boto3
import json
from datetime import datetime

class EventBridgePublisher:
    """EventBridge event publisher"""

    def __init__(self, event_bus_name: str = 'default'):
        self.eventbridge = boto3.client('events')
        self.event_bus_name = event_bus_name

    def publish_event(self, detail_type: str, detail: dict, source: str = 'custom.app'):
        """Publish event to EventBridge"""
        response = self.eventbridge.put_events(
            Entries=[
                {
                    'Source': source,
                    'DetailType': detail_type,
                    'Detail': json.dumps(detail),
                    'EventBusName': self.event_bus_name,
                    'Time': datetime.utcnow(),
                    'Resources': []
                }
            ]
        )
        return response

    def publish_order_event(self, order_data: dict):
        """Publish order-related event"""
        self.publish_event(
            detail_type='OrderCreated',
            detail=order_data,
            source='custom.orders'
        )

    def publish_payment_event(self, payment_data: dict):
        """Publish payment-related event"""
        self.publish_event(
            detail_type='PaymentProcessed',
            detail=payment_data,
            source='custom.payments'
        )

# EventBridge rule for routing
"""
{
  "source": ["custom.orders"],
  "detail-type": ["OrderCreated"],
  "detail": {
    "status": ["confirmed"]
  }
}
"""

# Step Functions integration with EventBridge
"""
{
  "StartAt": "WaitForOrderEvent",
  "States": {
    "WaitForOrderEvent": {
      "Type": "Wait",
      "EventBridgeEventPattern": {
        "source": ["custom.orders"],
        "detail-type": ["OrderCreated"]
      },
      "Next": "ProcessOrder"
    }
  }
}
"""

Kinesis Data Streams

# Kinesis producer and consumer
import boto3
import json
from typing import List, Dict, Any
from datetime import datetime
import hashlib

class KinesisProducer:
    """Kinesis data stream producer"""

    def __init__(self, stream_name: str):
        self.kinesis = boto3.client('kinesis')
        self.stream_name = stream_name

    def put_record(self, data: Dict[str, Any], partition_key: str = None):
        """Put single record"""
        if partition_key is None:
            partition_key = hashlib.md5(
                json.dumps(data).encode()
            ).hexdigest()

        response = self.kinesis.put_record(
            StreamName=self.stream_name,
            Data=json.dumps(data),
            PartitionKey=partition_key
        )
        return response

    def put_records(self, records: List[Dict[str, Any]]):
        """Put multiple records"""
        kinesis_records = [
            {
                'Data': json.dumps(record),
                'PartitionKey': hashlib.md5(
                    json.dumps(record).encode()
                ).hexdigest()
            }
            for record in records
        ]

        # Kinesis batch limit is 500
        batch_size = 500
        responses = []

        for i in range(0, len(kinesis_records), batch_size):
            batch = kinesis_records[i:i + batch_size]
            response = self.kinesis.put_records(
                StreamName=self.stream_name,
                Records=batch
            )
            responses.append(response)

        return responses

class KinesisConsumer:
    """Kinesis data stream consumer with enhanced fan-out"""

    def __init__(self, stream_name: str, consumer_name: str):
        self.kinesis = boto3.client('kinesis')
        self.stream_name = stream_name
        self.consumer_name = consumer_name

    def get_shard_iterator(self, shard_id: str, iterator_type: str = 'LATEST'):
        """Get shard iterator"""
        response = self.kinesis.get_shard_iterator(
            StreamName=self.stream_name,
            ShardId=shard_id,
            ShardIteratorType=iterator_type
        )
        return response['ShardIterator']

    def get_records(self, shard_iterator: str, limit: int = 100) -> List[Dict[str, Any]]:
        """Get records from shard"""
        response = self.kinesis.get_records(
            ShardIterator=shard_iterator,
            Limit=limit
        )
        return response['Records']

    def process_stream(self):
        """Process entire stream"""
        # Get shard list
        stream_description = self.kinesis.describe_stream(
            StreamName=self.stream_name
        )
        shards = stream_description['StreamDescription']['Shards']

        for shard in shards:
            shard_id = shard['ShardId']
            shard_iterator = self.get_shard_iterator(shard_id)

            while shard_iterator:
                records = self.get_records(shard_iterator)

                for record in records:
                    data = json.loads(record['Data'])
                    self.process_record(data)

                # Get next shard iterator
                if records:
                    shard_iterator = records[-1].get('NextShardIterator')
                else:
                    break

    def process_record(self, record: Dict[str, Any]):
        """Process individual record"""
        print(f"Processing record: {record}")

โœ…Serverless Best Practices

Use provisioned concurrency for critical functions, implement dead letter queues, and monitor cold start metrics. Consider Lambda layers for shared dependencies.

Summary

ComponentPurposeConfiguration
LambdaServerless computeMemory, timeout, concurrency
Step FunctionsWorkflow orchestrationState machines, parallel execution
DynamoDBNoSQL databaseSingle-table design, GSIs
EventBridgeEvent routingRules, patterns, targets
KinesisStream processingShards, consumers, producers

Advertisement