Streaming Architecture Overview
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β AWS Streaming Architecture β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β Producers Stream Processing Consumers β
β ββββββββββββ ββββββββββββββββββββ ββββββββββββ β
β β IoT ββββββββββΆβ Kinesis Data ββββββΆβ Lambda β β
β β Devices β β Streams β β β β
β ββββββββββββ ββββββββββββββββββββ ββββββββββββ β
β β
β ββββββββββββ ββββββββββββββββββββ ββββββββββββ β
β β Apps ββββββββββΆβ MSK (Kafka) ββββββΆβ Flink β β
β β Servers β β β β Analyticsβ β
β ββββββββββββ ββββββββββββββββββββ ββββββββββββ β
β β
β ββββββββββββ ββββββββββββββββββββ ββββββββββββ β
β β DatabasesββββββββββΆβ EventBridge ββββββΆβ S3/Lake β β
β β (CDC) β β β β β β
β ββββββββββββ ββββββββββββββββββββ ββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Q1: Compare Kinesis Data Streams vs MSK (Managed Kafka). When would you choose each?
Answer:
| Feature | Kinesis Data Streams | MSK (Kafka) |
|---|---|---|
| Protocol | Kinesis API | Kafka API |
| Retention | 24h - 365d | Configurable |
| Throughput | Per shard (1MB/s) | Per broker |
| Cost Model | Per shard-hour | Per broker-hour |
| Ecosystem | AWS-native | Open-source |
Choose Kinesis when:
- AWS-native integration needed
- Simple producer/consumer patterns
- Quick setup required
- Cost is primary concern
Choose MSK when:
- Existing Kafka expertise
- Complex stream processing
- Need Kafka Connect/Streams
- Multi-cloud portability
Decision Flow:
βββββββββββββββββββββββββββββββββββββββββββ
β Do you have existing Kafka code? β
ββββββββββββββββββ¬βββββββββββββββββββββββββ
β
ββββββββββ΄βββββββββ
βΌ βΌ
Yes No
β β
βΌ βΌ
βββββββββββββ βββββββββββββββββββββ
β MSK β β Need Kafka β
βββββββββββββ β Connect/Streams? β
βββββββββββ¬ββββββββββ
β
ββββββββββββ΄βββββββββββ
βΌ βΌ
Yes No
β β
βΌ βΌ
βββββββββββββ βββββββββββββ
β MSK β β Kinesis β
βββββββββββββ βββββββββββββ
Q2: Design a real-time fraud detection system using AWS streaming services.
Answer:
Architecture:
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Real-Time Fraud Detection Architecture β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β Transaction Sources β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ β
β β POS Systems β β Mobile Apps β β E-commerce β β
β ββββββββ¬βββββββ ββββββββ¬βββββββ ββββββββ¬βββββββ β
β β β β β
β ββββββββββββββββββββΌβββββββββββββββββββ β
β βΌ β
β βββββββββββββββ β
β β Kinesis β β
β β Streams β β
β ββββββββ¬βββββββ β
β β β
β βββββββββββββΌββββββββββββ β
β βΌ βΌ βΌ β
β βββββββββββββββ βββββββββββ βββββββββββββββ β
β β Kinesis β β Lambda β β Kinesis β β
β β Analytics β β (Rules) β β Data Firehoseβ β
β β (Flink) β β β β β β
β ββββββββ¬βββββββ ββββββ¬βββββ ββββββββ¬βββββββ β
β β β β β
β βΌ βΌ βΌ β
β βββββββββββββββ βββββββββββ βββββββββββββββ β
β β Fraud β β DynamoDBβ β S3 Data β β
β β Scores β β State β β Lake β β
β ββββββββ¬βββββββ βββββββββββ βββββββββββββββ β
β β β
β βΌ β
β βββββββββββββββ β
β β Alert β β
β β System β β
β βββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Kinesis Analytics Flink SQL:
-- Detect suspicious patterns
SELECT
customer_id,
COUNT(*) as transaction_count,
SUM(amount) as total_amount,
TUMBLE_START(event_time, INTERVAL '5' MINUTE) as window_start
FROM transactions
WHERE amount > 1000
GROUP BY customer_id, TUMBLE(event_time, INTERVAL '5' MINUTE)
HAVING COUNT(*) > 3 OR SUM(amount) > 10000;
Lambda Fraud Scoring:
import boto3
from datetime import datetime, timedelta
def lambda_handler(event, context):
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('customer_profiles')
for record in event['Records']:
transaction = json.loads(record['kinesis']['data'])
# Get customer history
response = table.get_item(Key={'customer_id': transaction['customer_id']})
profile = response.get('Item', {})
# Calculate fraud score
fraud_score = calculate_fraud_score(transaction, profile)
if fraud_score > 0.8:
send_alert(transaction, fraud_score)
β οΈ
Key Interview Point: Always discuss state management in streaming. For fraud detection, you need to maintain customer profiles and transaction history across windows.
Q3: How do you handle backpressure in Kinesis Data Streams?
Answer:
Backpressure Causes:
- Consumer processing slower than producer
- Network bandwidth limitations
- Downstream system throttling
Mitigation Strategies:
1. Enhanced Fan-Out:
# Use enhanced fan-out for dedicated throughput
kinesis = boto3.client('kinesis')
kinesis.register_stream_consumer(
StreamARN='arn:aws:kinesis:us-east-1:123456789:stream/my-stream',
ConsumerName='my-consumer'
)
2. Shard Splitting:
# Split hot shards
kinesis.update_shard_count(
StreamName='my-stream',
TargetShardCount=16,
ScalingType='UNIFORM_SCALING'
)
3. Batch Processing Optimization:
# Optimize batch size and window
consumer_config = {
'max_batch_size': 10000,
'batch_window_seconds': 5,
'parallelism': 8
}
4. Circuit Breaker Pattern:
class CircuitBreaker:
def __init__(self, failure_threshold=5, reset_timeout=60):
self.failure_count = 0
self.failure_threshold = failure_threshold
self.reset_timeout = reset_timeout
self.state = 'CLOSED'
def call(self, func, *args, **kwargs):
if self.state == 'OPEN':
if time.time() - self.last_failure > self.reset_timeout:
self.state = 'HALF_OPEN'
else:
raise Exception("Circuit breaker is OPEN")
try:
result = func(*args, **kwargs)
if self.state == 'HALF_OPEN':
self.state = 'CLOSED'
self.failure_count = 0
return result
except Exception as e:
self.failure_count += 1
if self.failure_count >= self.failure_threshold:
self.state = 'OPEN'
self.last_failure = time.time()
raise
Q4: Explain exactly-once processing semantics in AWS streaming services.
Answer:
Exactly-Once Challenges:
- Network failures causing duplicate delivery
- Consumer crashes mid-processing
- Reprocessing during recovery
AWS Solutions:
1. Kinesis + DynamoDB Transactions:
import boto3
from boto3.dynamodb.conditions import Key
dynamodb = boto3.client('dynamodb')
def process_with_exactly_once(record):
sequence_number = record['kinesis']['sequenceNumber']
# Check if already processed
response = dynamodb.get_item(
TableName='processed_records',
Key={'sequence_number': {'S': sequence_number}}
)
if 'Item' in response:
return # Already processed
# Process and mark as processed in transaction
dynamodb.transact_write_items(
TransactItems=[
{
'Put': {
'TableName': 'processed_records',
'Item': {
'sequence_number': {'S': sequence_number},
'processed_at': {'S': datetime.now().isoformat()}
}
}
},
{
'Put': {
'TableName': 'results',
'Item': {
'result_id': {'S': record['result_id']},
'data': {'S': json.dumps(record['data'])}
}
}
}
]
)
2. MSK + Kafka Transactions:
// Kafka producer with transactions
Properties props = new Properties();
props.put("transactional.id", "my-transactional-id");
props.put("enable.idempotence", "true");
Producer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions();
producer.beginTransaction();
try {
for (ConsumerRecord<String, String> record : records) {
producer.send(new ProducerRecord<>("output-topic",
record.key(), record.value()));
}
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
}
3. Idempotent Writes to S3:
# Use content hash for idempotent writes
import hashlib
def write_idempotent(data, bucket, key):
content_hash = hashlib.md5(json.dumps(data, sort_keys=True).encode()).hexdigest()
s3_key = f"{key}/{content_hash}.json"
s3 = boto3.client('s3')
s3.put_object(
Bucket=bucket,
Key=s3_key,
Body=json.dumps(data)
)
Q5: Design a real-time analytics dashboard using AWS streaming services.
Answer:
Architecture:
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Real-Time Analytics Dashboard Architecture β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β Data Sources β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ β
β β Clickstream β β API Logs β β IoT Sensors β β
β ββββββββ¬βββββββ ββββββββ¬βββββββ ββββββββ¬βββββββ β
β β β β β
β ββββββββββββββββββββΌβββββββββββββββββββ β
β βΌ β
β βββββββββββββββ β
β β Kinesis β β
β β Data Firehoseβ β
β ββββββββ¬βββββββ β
β β β
β βββββββββββββΌββββββββββββ β
β βΌ βΌ β
β βββββββββββββββ βββββββββββββββ β
β β S3 (Raw) β β Kinesis β β
β β β β Analytics β β
β ββββββββ¬βββββββ ββββββββ¬βββββββ β
β β β β
β βΌ βΌ β
β βββββββββββββββ βββββββββββββββ β
β β Athena β β Lambda β β
β β (Ad-hoc) β β (Real-time) β β
β ββββββββ¬βββββββ ββββββββ¬βββββββ β
β β β β
β βββββββββββββ¬ββββββββββββ β
β βΌ β
β βββββββββββββββ β
β β QuickSight β β
β β Dashboard β β
β βββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Real-Time Aggregation with Lambda:
import boto3
from datetime import datetime
cloudwatch = boto3.client('cloudwatch')
def lambda_handler(event, context):
# Aggregate metrics in real-time
metrics = {}
for record in event['Records']:
data = json.loads(record['kinesis']['data'])
metric_name = data['metric_name']
if metric_name not in metrics:
metrics[metric_name] = {'count': 0, 'sum': 0, 'min': float('inf'), 'max': float('-inf')}
metrics[metric_name]['count'] += 1
metrics[metric_name]['sum'] += data['value']
metrics[metric_name]['min'] = min(metrics[metric_name]['min'], data['value'])
metrics[metric_name]['max'] = max(metrics[metric_name]['max'], data['value'])
# Publish to CloudWatch
for metric_name, values in metrics.items():
cloudwatch.put_metric_data(
Namespace='RealTimeAnalytics',
MetricData=[
{
'MetricName': metric_name,
'Dimensions': [
{'Name': 'TimeWindow', 'Value': '1-minute'}
],
'StatisticValues': {
'Sum': values['sum'],
'Minimum': values['min'],
'Maximum': values['max'],
'SampleCount': values['count']
},
'Unit': 'Count'
}
]
)
Q6: How do you implement schema evolution in Kinesis Data Streams?
Answer:
Schema Registry Integration:
# Using Glue Schema Registry with Kinesis
import boto3
from aws_kinesis_agg import record_handler
# Register schema
glue = boto3.client('glue')
schema_definition = {
'type': 'record',
'name': 'Transaction',
'fields': [
{'name': 'transaction_id', 'type': 'string'},
{'name': 'amount', 'type': 'double'},
{'name': 'timestamp', 'type': 'long'}
]
}
glue.register_schema_version(
SchemaId={
'RegistryName': 'my-registry',
'SchemaName': 'transaction-schema'
},
SchemaVersionNumber={
'LatestVersion': True
}
)
Consumer Schema Handling:
class SchemaEvolutionHandler:
def __init__(self):
self.schema_versions = {}
def process_record(self, record):
schema_version = record['metadata']['schema_version']
# Get or cache schema
if schema_version not in self.schema_versions:
self.schema_versions[schema_version] = self.fetch_schema(schema_version)
schema = self.schema_versions[schema_version]
# Handle missing fields with defaults
for field in schema['fields']:
if field['name'] not in record['data']:
record['data'][field['name']] = self.get_default_value(field['type'])
return record
Schema Evolution Strategies:
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Schema Evolution Strategies β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β Forward Compatibility β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Producer adds new field β Consumer ignores unknown β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β Backward Compatibility β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Producer removes field β Consumer uses default value β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β Full Compatibility β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Both directions work with default values β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Q7: Explain windowing strategies in Kinesis Analytics.
Answer:
Window Types:
1. Tumbling Windows (Fixed):
-- 5-minute tumbling windows
SELECT
product_id,
COUNT(*) as sales_count,
SUM(amount) as total_sales,
TUMBLE_START(event_time, INTERVAL '5' MINUTE) as window_start
FROM sales_stream
GROUP BY product_id, TUMBLE(event_time, INTERVAL '5' MINUTE);
2. Sliding Windows:
-- 10-minute window sliding every 1 minute
SELECT
customer_id,
AVG(amount) as avg_amount,
SLIDESTART(event_time, INTERVAL '10' MINUTE, INTERVAL '1' MINUTE) as window_start
FROM transactions
GROUP BY customer_id, SLIDE(event_time, INTERVAL '10' MINUTE, INTERVAL '1' MINUTE);
3. Session Windows:
-- Session windows with 30-minute gap
SELECT
user_id,
COUNT(*) as actions,
SESSIONSTART(event_time, INTERVAL '30' MINUTE) as session_start
FROM user_events
GROUP BY user_id, SESSION(event_time, INTERVAL '30' MINUTE);
4. Custom Windows with Lambda:
# Custom windowing logic
class CustomWindow:
def __init__(self, window_size, slide_interval):
self.window_size = window_size
self.slide_interval = slide_interval
self.windows = {}
def add_event(self, event):
window_key = self.get_window_key(event['timestamp'])
if window_key not in self.windows:
self.windows[window_key] = []
self.windows[window_key].append(event)
# Trigger processing for complete windows
if self.is_window_complete(window_key):
self.process_window(window_key, self.windows[window_key])
Windowing Comparison:
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Windowing Strategies Comparison β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β Tumbling Windows: β
β |----W1----|----W2----|----W3----| β
β (Non-overlapping, fixed size) β
β β
β Sliding Windows: β
β |----W1----| β
β |----W2----| β
β |----W3----| β
β (Overlapping, slide interval < window size) β
β β
β Session Windows: β
β |--W1--| |--W2---------| |--W3--| β
β (Dynamic size based on activity) β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Q8: How do you implement dead letter queues for streaming failures?
Answer:
DLQ Architecture:
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β DLQ Pattern for Streaming β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β βββββββββββββββ β
β β Kinesis β β
β β Stream β β
β ββββββββ¬βββββββ β
β β β
β βΌ β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ β
β β Consumer ββββββΆβ Processing ββββββΆβ Success β β
β β Lambda β β Function β β Output β β
β βββββββββββββββ ββββββββ¬βββββββ βββββββββββββββ β
β β β
β Failureβ β
β βΌ β
β βββββββββββββββ β
β β DLQ β β
β β (SQS) β β
β ββββββββ¬βββββββ β
β β β
β βΌ β
β βββββββββββββββ β
β β Alert & β β
β β Retry β β
β βββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Implementation:
import boto3
import json
sqs = boto3.client('sqs')
dlq_url = 'https://sqs.us-east-1.amazonaws.com/123456789/my-dlq'
def lambda_handler(event, context):
for record in event['Records']:
try:
process_record(record)
except Exception as e:
# Send to DLQ with error details
sqs.send_message(
QueueUrl=dlq_url,
MessageBody=json.dumps({
'record': record,
'error': str(e),
'timestamp': datetime.now().isoformat(),
'retry_count': 0
}),
MessageAttributes={
'ErrorType': {
'DataType': 'String',
'StringValue': type(e).__name__
}
}
)
DLQ Processing Lambda:
def process_dlq(event, context):
for record in event['Records']:
message = json.loads(record['body'])
# Check retry count
if message['retry_count'] < 3:
# Retry processing
try:
process_record(message['record'])
except Exception as e:
# Update retry count and send back to DLQ
message['retry_count'] += 1
message['error'] = str(e)
sqs.send_message(
QueueUrl=dlq_url,
MessageBody=json.dumps(message)
)
else:
# Max retries exceeded, alert
send_alert(message)
Q9: Design a real-time ETL pipeline using Kinesis Data Firehose.
Answer:
Architecture:
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Real-Time ETL with Kinesis Data Firehose β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β βββββββββββββββ β
β β Data β β
β β Sources β β
β ββββββββ¬βββββββ β
β β β
β βΌ β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ β
β β Kinesis ββββββΆβ Firehose ββββββΆβ Lambda β β
β β Data Firehoseβ β Stream β β Transform β β
β βββββββββββββββ βββββββββββββββ ββββββββ¬βββββββ β
β β β
β βββββββββββββββββββββββββββββββΌββββββββ β
β β β β β β
β βΌ βΌ βΌ βΌ β
β βββββββββββ βββββββββββ ββββββββββββββββββββββ
β β S3 β β Redshiftβ β OpenSearchββSplunk ββ
β β (Data β β (DW) β β (Search) ββ ββ
β β Lake) β β β β ββ ββ
β βββββββββββ βββββββββββ ββββββββββββββββββββββ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Firehose Configuration:
firehose = boto3.client('firehose')
# Create delivery stream with transformation
firehose.create_delivery_stream(
DeliveryStreamName='real-time-etl-stream',
DeliveryStreamType='DirectPut',
ExtendedS3DestinationConfiguration={
'RoleARN': 'arn:aws:iam::role/firehose-role',
'BucketARN': 'arn:aws:s3:::data-lake-bucket',
'Prefix': 'raw/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/',
'ErrorOutputPrefix': 'errors/',
'ProcessingConfiguration': {
'Enabled': True,
'Processors': [
{
'Type': 'Lambda',
'Parameters': [
{
'ParameterName': 'LambdaArn',
'ParameterValue': 'arn:aws:lambda:us-east-1:123456789:function:transform-function'
},
{
'ParameterName': 'BufferSizeInMBs',
'ParameterValue': '3'
},
{
'ParameterName': 'BufferIntervalInSeconds',
'ParameterValue': '60'
}
]
}
]
}
}
)
Lambda Transformation:
import base64
import json
def lambda_handler(event, context):
output = []
for record in event['records']:
# Decode input
payload = base64.b64decode(record['data']).decode('utf-8')
data = json.loads(payload)
# Transform
transformed = {
'event_id': data['id'],
'event_type': data['type'].upper(),
'timestamp': data['ts'],
'processed_at': datetime.now().isoformat(),
'amount': float(data['amount'])
}
# Encode output
output_record = {
'recordId': record['recordId'],
'result': 'Ok',
'data': base64.b64encode(json.dumps(transformed).encode('utf-8')).decode('utf-8')
}
output.append(output_record)
return {'records': output}
Q10: How do you implement real-time data enrichment in streaming pipelines?
Answer:
Enrichment Patterns:
1. Lambda-Based Enrichment:
import boto3
dynamodb = boto3.resource('dynamodb')
enrichment_table = dynamodb.Table('customer_profiles')
def lambda_handler(event, context):
enriched_records = []
for record in event['Records']:
data = json.loads(record['kinesis']['data'])
# Enrich with customer data
response = enrichment_table.get_item(
Key={'customer_id': data['customer_id']}
)
customer = response.get('Item', {})
enriched_record = {
**data,
'customer_name': customer.get('name', 'Unknown'),
'customer_tier': customer.get('tier', 'Standard'),
'enriched_at': datetime.now().isoformat()
}
enriched_records.append(enriched_record)
return {'records': enriched_records}
2. DynamoDB Streams Enrichment:
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Real-Time Enrichment Architecture β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ β
β β Kinesis ββββββΆβ Lambda ββββββΆβ Enriched β β
β β Stream β β Enrichment β β Stream β β
β βββββββββββββββ ββββββββ¬βββββββ βββββββββββββββ β
β β β
β Queryβ β
β βΌ β
β βββββββββββββββ β
β β DynamoDB β β
β β (Lookup) β β
β ββββββββ¬βββββββ β
β β β
β βΌ β
β βββββββββββββββ β
β β DynamoDB β β
β β Streams β β
β ββββββββ¬βββββββ β
β β β
β Updateβ β
β βΌ β
β βββββββββββββββ β
β β Cache β β
β β (ElastiCache) β
β βββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
3. Cache-Enhanced Enrichment:
import redis
import json
redis_client = redis.Redis(host='my-redis-cluster.xxx.cache.amazonaws.com', port=6379)
def lambda_handler(event, context):
for record in event['Records']:
data = json.loads(record['kinesis']['data'])
# Check cache first
cache_key = f"customer:{data['customer_id']}"
cached = redis_client.get(cache_key)
if cached:
customer = json.loads(cached)
else:
# Fetch from DynamoDB
customer = fetch_from_dynamodb(data['customer_id'])
# Cache for 5 minutes
redis_client.setex(cache_key, 300, json.dumps(customer))
# Enrich and continue processing
enriched = {**data, **customer}
process_enriched(enriched)
Q11: Explain backfill strategies for streaming data.
Answer:
Backfill Architecture:
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Backfill Strategy β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β Historical Data Backfill Process Current Data β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ β
β β S3 ββββββββΆβ EMR/Glue ββββββΆβ S3 β β
β β (Archive) β β (Reprocess)β β (Updated) β β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ β
β β
β βββββββββββββββ β
β β Kinesis β β
β Current Stream βββββββββββΆβ Streams β β
β ββββββββ¬βββββββ β
β β β
β βΌ β
β βββββββββββββββ β
β β Unified β β
β β View β β
β βββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Implementation:
def backfill_stream(backfill_start, backfill_end):
"""Backfill historical data into current stream"""
# Read historical data from S3
historical_data = read_from_s3(
bucket='archive-bucket',
start_date=backfill_start,
end_date=backfill_end
)
# Transform to match current format
transformed = transform_to_current_format(historical_data)
# Send to Kinesis stream
kinesis = boto3.client('kinesis')
for batch in chunk(transformed, 500):
kinesis.put_records(
StreamName='current-stream',
Records=[
{
'Data': json.dumps(record).encode('utf-8'),
'PartitionKey': record['partition_key']
}
for record in batch
]
)
Idempotent Backfill:
def idempotent_backfill(record):
"""Ensure backfill doesn't create duplicates"""
# Check if record already exists
response = dynamodb.get_item(
TableName='processed_records',
Key={'record_id': record['id']}
)
if 'Item' in response:
return # Skip, already processed
# Process and mark as processed
process_record(record)
dynamodb.put_item(
TableName='processed_records',
Item={
'record_id': record['id'],
'processed_at': datetime.now().isoformat(),
'source': 'backfill'
}
)
Q12: How do you implement real-time anomaly detection in streaming data?
Answer:
Anomaly Detection Architecture:
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Real-Time Anomaly Detection β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β βββββββββββββββ β
β β Kinesis β β
β β Stream β β
β ββββββββ¬βββββββ β
β β β
β βΌ β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ β
β β Kinesis ββββββΆβ Anomaly ββββββΆβ Alert β β
β β Analytics β β Detection β β System β β
β β (Flink) β β (ML Model) β β β β
β βββββββββββββββ ββββββββ¬βββββββ βββββββββββββββ β
β β β
β βΌ β
β βββββββββββββββ β
β β S3 β β
β β (Anomalies)β β
β βββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Statistical Anomaly Detection:
class AnomalyDetector:
def __init__(self, window_size=100, threshold=3.0):
self.window_size = window_size
self.threshold = threshold
self.window = []
def add_value(self, value):
self.window.append(value)
if len(self.window) > self.window_size:
self.window.pop(0)
def is_anomaly(self, value):
if len(self.window) < 10:
return False
mean = sum(self.window) / len(self.window)
std = (sum((x - mean) ** 2 for x in self.window) / len(self.window)) ** 0.5
if std == 0:
return False
z_score = (value - mean) / std
return abs(z_score) > self.threshold
Kinesis Analytics Anomaly Detection:
-- Statistical anomaly detection
WITH stats AS (
SELECT
metric_name,
AVG(value) as avg_value,
STDDEV(value) as std_value,
COUNT(*) as sample_count
FROM metrics_stream
GROUP BY metric_name
)
SELECT
m.metric_name,
m.value,
s.avg_value,
s.std_value,
CASE
WHEN ABS(m.value - s.avg_value) > 3 * s.std_value THEN 'ANOMALY'
ELSE 'NORMAL'
END as anomaly_flag
FROM metrics_stream m
JOIN stats s ON m.metric_name = s.metric_name;
βΉοΈ
Interview Tip: Discuss both statistical methods (z-score, moving average) and ML-based approaches (Isolation Forest, Autoencoders) for anomaly detection.
Q13: Design a multi-region streaming architecture for global applications.
Answer:
Global Streaming Architecture:
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Multi-Region Streaming Architecture β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β Region 1 (us-east-1) Region 2 (eu-west-1) β
β βββββββββββββββββββββββ βββββββββββββββββββββββ β
β β Kinesis Stream βββββΆβ Kinesis Stream β β
β β (Primary) β β (Replica) β β
β ββββββββββββ¬βββββββββββ ββββββββββββ¬βββββββββββ β
β β β β
β βΌ βΌ β
β βββββββββββββββββββββββ βββββββββββββββββββββββ β
β β Processing β β Processing β β
β β (Lambda/Flink) β β (Lambda/Flink) β β
β ββββββββββββ¬βββββββββββ ββββββββββββ¬βββββββββββ β
β β β β
β βΌ βΌ β
β βββββββββββββββββββββββ βββββββββββββββββββββββ β
β β S3 (Regional) β β S3 (Regional) β β
β ββββββββββββ¬βββββββββββ ββββββββββββ¬βββββββββββ β
β β β β
β ββββββββββββ¬ββββββββββββββββ β
β βΌ β
β βββββββββββββββββββββββ β
β β Global Redshift β β
β β (Replicated) β β
β βββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Cross-Region Replication:
# Kinesis Mirror Maker for cross-region
def replicate_streams(source_region, dest_region, stream_name):
source_kinesis = boto3.client('kinesis', region_name=source_region)
dest_kinesis = boto3.client('kinesis', region_name=dest_region)
# Get shard iterator
response = source_kinesis.describe_stream(StreamName=stream_name)
shard_id = response['StreamDescription']['Shards'][0]['ShardId']
iterator = source_kinesis.get_shard_iterator(
StreamName=stream_name,
ShardId=shard_id,
ShardIteratorType='LATEST'
)['ShardIterator']
while True:
records = source_kinesis.get_records(ShardIterator=iterator, Limit=100)
# Replicate to destination
dest_kinesis.put_records(
StreamName=stream_name,
Records=[
{
'Data': record['Data'],
'PartitionKey': record['PartitionKey']
}
for record in records['Records']
]
)
iterator = records['NextShardIterator']
if not iterator:
break
Q14: How do you implement exactly-once semantics with Kinesis and Lambda?
Answer:
Exactly-Once Pattern:
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Exactly-Once with Kinesis + Lambda β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ β
β β Kinesis ββββββΆβ Lambda ββββββΆβ DynamoDB β β
β β Stream β β (Idempotent)β β (State) β β
β βββββββββββββββ ββββββββ¬βββββββ ββββββββ¬βββββββ β
β β β β
β Checkβ Writeβ β
β βΌ βΌ β
β βββββββββββββββ βββββββββββββββ β
β β DynamoDB β β S3 β β
β β (Sequence β β (Output) β β
β β Check) β β β β
β βββββββββββββββ βββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Implementation:
import boto3
import json
dynamodb = boto3.resource('dynamodb')
state_table = dynamodb.Table('processing_state')
def lambda_handler(event, context):
for record in event['Records']:
sequence_number = record['kinesis']['sequenceNumber']
shard_id = record['kinesis']['partitionKey']
# Check if already processed
response = state_table.get_item(
Key={
'shard_id': shard_id,
'sequence_number': sequence_number
}
)
if 'Item' in response:
print(f"Record {sequence_number} already processed, skipping")
continue
# Process record
data = json.loads(record['kinesis']['data'])
result = process_data(data)
# Write result and state atomically
state_table.transact_write_items(
TransactItems=[
{
'Put': {
'TableName': 'processing_state',
'Item': {
'shard_id': shard_id,
'sequence_number': sequence_number,
'processed_at': datetime.now().isoformat()
}
}
},
{
'Put': {
'TableName': 'results',
'Item': {
'result_id': data['id'],
'result': result,
'processed_at': datetime.now().isoformat()
}
}
}
]
)
Q15: Explain stream processing with exactly-once guarantees using MSK.
Answer:
Kafka Exactly-Once Pattern:
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Kafka Exactly-Once Architecture β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β βββββββββββββββ β
β β Producer β β
β β (Idempotent)β β
β ββββββββ¬βββββββ β
β β β
β βΌ β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ β
β β Kafka ββββββΆβ Consumer ββββββΆβ Kafka β β
β β (Input) β β (Transaction)β β (Output) β β
β βββββββββββββββ ββββββββ¬βββββββ βββββββββββββββ β
β β β
β Commitβ β
β βΌ β
β βββββββββββββββ β
β β Consumer β β
β β Offsets β β
β βββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Spring Kafka Transactional Producer:
@Configuration
public class KafkaConfig {
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "broker:9092");
config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "txn-");
config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
@Service
public class TransactionalProcessor {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Transactional
public void processAndProduce(List<ConsumerRecord<String, String>> records) {
kafkaTemplate.executeInTransaction(template -> {
for (ConsumerRecord<String, String> record : records) {
// Process record
String result = processRecord(record);
// Send to output topic
template.send("output-topic", record.key(), result);
}
return null;
});
}
}
Q16: How do you monitor and troubleshoot streaming pipelines?
Answer:
Monitoring Dashboard:
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Streaming Pipeline Monitoring β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Key Metrics β β
β β βββββββββββββββ¬ββββββββββββββ¬ββββββββββββββ¬βββββββββββ β β
β β β Incoming β Outgoing β Iterator β Processingβ β β
β β β Records/s β Records/s β Age (hours) β Latency β β β
β β β 1,250 β 1,200 β 0.5 β 150ms β β β
β β βββββββββββββββ΄ββββββββββββββ΄ββββββββββββββ΄βββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Shard-Level Metrics β β
β β βββββββββββ¬ββββββββββ¬ββββββββββ¬ββββββββββ¬βββββββββββββ β β
β β β Shard 0 β Shard 1 β Shard 2 β Shard 3 β Shard 4 β β β
β β β 250/s β 280/s β 220/s β 240/s β 260/s β β β
β β βββββββββββ΄ββββββββββ΄ββββββββββ΄ββββββββββ΄βββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Error Analysis β β
β β β’ Lambda Errors: 12 (0.1%) β β
β β β’ DLQ Messages: 5 β β
β β β’ Throttling Events: 23 β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
CloudWatch Alarms:
cloudwatch = boto3.client('cloudwatch')
# Iterator age alarm
cloudwatch.put_metric_alarm(
AlarmName='KinesisIteratorAge',
MetricName='GetRecords.IteratorAgeMilliseconds',
Namespace='AWS/Kinesis',
Statistic='Maximum',
Period=300,
EvaluationPeriods=2,
Threshold=3600000, # 1 hour in milliseconds
ComparisonOperator='GreaterThanThreshold',
AlarmActions=['arn:aws:sns:us-east-1:123456789:alerts'],
Dimensions=[
{'Name': 'StreamName', 'Value': 'my-stream'}
]
)
# Throttling alarm
cloudwatch.put_metric_alarm(
AlarmName='KinesisThrottling',
MetricName='ReadProvisionedThroughputExceeded',
Namespace='AWS/Kinesis',
Statistic='Sum',
Period=300,
EvaluationPeriods=1,
Threshold=10,
ComparisonOperator='GreaterThanThreshold',
AlarmActions=['arn:aws:sns:us-east-1:123456789:alerts'],
Dimensions=[
{'Name': 'StreamName', 'Value': 'my-stream'}
]
)
Q17: Design a stream processing application with state management.
Answer:
State Management Patterns:
1. External State Store (DynamoDB):
class StatefulProcessor:
def __init__(self):
self.dynamodb = boto3.resource('dynamodb')
self.state_table = self.dynamodb.Table('processing_state')
def process_with_state(self, record):
# Get current state
response = self.state_table.get_item(
Key={'key': record['key']}
)
current_state = response.get('Item', {'count': 0, 'sum': 0})
# Update state
new_state = {
'count': current_state['count'] + 1,
'sum': current_state['sum'] + record['value'],
'last_updated': datetime.now().isoformat()
}
# Save state
self.state_table.put_item(Item={
'key': record['key'],
**new_state
})
return new_state
2. Windowed State with Kinesis Analytics:
-- Maintain running aggregates
SELECT
customer_id,
COUNT(*) OVER w as running_count,
SUM(amount) OVER w as running_sum,
AVG(amount) OVER w as running_avg
FROM transactions
WINDOW w AS (
PARTITION BY customer_id
ORDER BY event_time
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
);
3. State Backend Architecture:
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β State Management Architecture β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β βββββββββββββββ β
β β Stream β β
β β Processor β β
β ββββββββ¬βββββββ β
β β β
β βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β State Backend Options β β
β β βββββββββββββββ¬ββββββββββββββ¬ββββββββββββββ¬ββββββββββββ β β
β β β DynamoDB β ElastiCache β S3 β Local β β β
β β β (Durable) β (Fast) β (Bulk) β (Testing) β β β
β β βββββββββββββββ΄ββββββββββββββ΄ββββββββββββββ΄ββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Q18: How do you implement event sourcing with AWS streaming services?
Answer:
Event Sourcing Architecture:
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Event Sourcing with AWS β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β Commands Event Store Projections β
β βββββββββββ βββββββββββββββ βββββββββββββββ β
β β API βββββββΆβ Kinesis βββββββΆβ Read Model β β
β β Gateway β β Streams β β (DynamoDB) β β
β βββββββββββ ββββββββ¬βββββββ βββββββββββββββ β
β β β
β β βββββββββββββββ β
β ββββββββββΆβ S3 (Event β β
β β β Archive) β β
β β βββββββββββββββ β
β β β
β β βββββββββββββββ β
β ββββββββββΆβ Analytics β β
β β (Athena) β β
β βββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Event Store Implementation:
class EventStore:
def __init__(self, stream_name):
self.kinesis = boto3.client('kinesis')
self.stream_name = stream_name
def append(self, aggregate_id, events):
for event in events:
self.kinesis.put_record(
StreamName=self.stream_name,
Data=json.dumps({
'aggregate_id': aggregate_id,
'event_type': event['type'],
'payload': event['payload'],
'timestamp': datetime.now().isoformat(),
'version': event['version']
}).encode('utf-8'),
PartitionKey=aggregate_id
)
def get_events(self, aggregate_id, from_version=0):
# Read from stream for this aggregate
shard_iterator = self.get_shard_iterator(aggregate_id)
events = []
while True:
records = self.kinesis.get_records(ShardIterator=shard_iterator)
for record in records['Records']:
event = json.loads(record['Data'])
if event['aggregate_id'] == aggregate_id:
if event['version'] > from_version:
events.append(event)
shard_iterator = records.get('NextShardIterator')
if not shard_iterator:
break
return events
Q19: Explain stream processing patterns for IoT data on AWS.
Answer:
IoT Streaming Architecture:
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β IoT Data Streaming Architecture β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β IoT Devices Ingestion Processing β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ β
β β Sensors βββββΆβ IoT Core βββββΆβ Kinesis β β
β β (MQTT) β β β β Streams β β
β βββββββββββββββ βββββββββββββββ ββββββββ¬βββββββ β
β β β
β βββββββββββββββ βββββββββββββββ β β
β β Gateways βββββΆβ IoT Analyticsββββββββββββ€ β
β β β β β β β
β βββββββββββββββ βββββββββββββββ β β
β βΌ β
β βββββββββββββββ β
β β Lambda β β
β β (Process) β β
β ββββββββ¬βββββββ β
β β β
β ββββββββββββββββββββββββββββΌβββββββββββ β
β βΌ βΌ βΌ β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ
β β S3 β β TimeStream β β DynamoDB ββ
β β (Data Lake)β β (Metrics) β β (State) ββ
β βββββββββββββββ βββββββββββββββ βββββββββββββββ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
IoT Core Rules:
-- IoT Rule to route to Kinesis
SELECT
timestamp() as event_time,
topic(2) as device_id,
temperature,
humidity,
pressure
FROM 'sensors/+/data'
WHERE temperature > 50
Lambda IoT Processor:
def lambda_handler(event, context):
for record in event['Records']:
payload = json.loads(base64.b64decode(record['kinesis']['data']))
# Process IoT data
processed = {
'device_id': payload['device_id'],
'timestamp': payload['event_time'],
'temperature': payload['temperature'],
'alert': payload['temperature'] > 60,
'processed_at': datetime.now().isoformat()
}
# Store in TimeStream for metrics
store_in_timestream(processed)
# Check for alerts
if processed['alert']:
send_alert(processed)
Q20: How do you implement data deduplication in streaming pipelines?
Answer:
Deduplication Strategies:
1. Sequence Number Tracking:
class Deduplicator:
def __init__(self):
self.processed_sequences = set()
self.redis = redis.Redis()
def is_duplicate(self, sequence_key, sequence_number):
# Check memory cache
if sequence_key in self.processed_sequences:
return True
# Check Redis for distributed deduplication
if self.redis.sismember('processed_sequences', sequence_key):
return True
# Mark as processed
self.processed_sequences.add(sequence_key)
self.redis.sadd('processed_sequences', sequence_key)
return False
2. Content Hash Deduplication:
import hashlib
class ContentDeduplicator:
def __init__(self):
self.seen_hashes = set()
def is_duplicate(self, record):
# Create content hash
content = json.dumps(record, sort_keys=True)
content_hash = hashlib.md5(content.encode()).hexdigest()
if content_hash in self.seen_hashes:
return True
self.seen_hashes.add(content_hash)
return False
3. Window-Based Deduplication:
class WindowDeduplicator:
def __init__(self, window_size=300): # 5 minutes
self.window_size = window_size
self.window = {}
def is_duplicate(self, key, timestamp):
# Clean old entries
cutoff = time.time() - self.window_size
self.window = {k: v for k, v in self.window.items() if v > cutoff}
# Check for duplicate
if key in self.window:
return True
self.window[key] = timestamp
return False
Q21: Design a real-time feature store for machine learning.
Answer:
Feature Store Architecture:
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Real-Time Feature Store β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β Data Sources Feature Engineering Feature Store β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ β
β β Clickstream βββββββΆβ Kinesis ββββββββΆβ DynamoDB β β
β β β β Analytics β β (Online) β β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ β
β β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ β
β β TransactionsβββββββΆβ Lambda ββββββββΆβ S3 β β
β β β β Features β β (Offline) β β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ β
β β
β β β β
β βΌ βΌ β
β βββββββββββββββ βββββββββββββββ β
β β SageMaker βββββββββ Glue β β
β β (Training) β β Catalog β β
β βββββββββββββββ βββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Feature Engineering Lambda:
def lambda_handler(event, context):
features = []
for record in event['Records']:
data = json.loads(record['kinesis']['data'])
# Compute real-time features
feature_vector = {
'user_id': data['user_id'],
'timestamp': data['timestamp'],
'features': {
'transaction_count_1h': get_transaction_count(data['user_id'], '1h'),
'avg_amount_24h': get_avg_amount(data['user_id'], '24h'),
'unique_products_7d': get_unique_products(data['user_id'], '7d'),
'last_purchase_time': get_last_purchase(data['user_id'])
}
}
features.append(feature_vector)
# Store in DynamoDB for online serving
store_online_features(features)
# Store in S3 for offline training
store_offline_features(features)
Q22: How do you handle late data and out-of-order events in streaming?
Answer:
Handling Strategies:
1. Watermark-Based Processing:
-- Kinesis Analytics watermark
SELECT
customer_id,
COUNT(*) as event_count,
TUMBLE_START(event_time, INTERVAL '5' MINUTE) as window_start
FROM events
GROUP BY customer_id, TUMBLE(event_time, INTERVAL '5' MINUTE);
2. Lambda Architecture for Late Data:
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Lambda Architecture for Late Data β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β Real-Time Path (Speed Layer) β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ β
β β Stream βββββΆβ Real-Time βββββΆβ Real-Time β β
β β Events β β Processing β β View β β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ β
β β
β Batch Path (Batch Layer) β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ β
β β All βββββΆβ Batch βββββΆβ Batch β β
β β Events β β Processing β β View β β
β β (S3) β β (EMR) β β β β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ β
β β
β Serving Layer β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Unified View (Redshift/Athena) β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
3. Out-of-Order Buffer:
class OutOfOrderBuffer:
def __init__(self, max_delay_seconds=300):
self.max_delay = max_delay_seconds
self.buffer = {}
def add_event(self, event):
event_time = event['timestamp']
current_time = time.time()
# Check if too late
if current_time - event_time > self.max_delay:
# Send to late data handler
self.handle_late_event(event)
return
# Add to buffer
window_key = self.get_window_key(event_time)
if window_key not in self.buffer:
self.buffer[window_key] = []
self.buffer[window_key].append(event)
# Process complete windows
self.process_complete_windows()
Q23: Explain exactly-once semantics with Kinesis and Step Functions.
Answer:
Step Functions Orchestration:
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Exactly-Once with Step Functions β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β βββββββββββββββ β
β β Start β β
β ββββββββ¬βββββββ β
β β β
β βΌ β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ β
β β Get RecordsββββββΆβ Process ββββββΆβ Write β β
β β (Kinesis) β β Records β β Results β β
β βββββββββββββββ βββββββββββββββ ββββββββ¬βββββββ β
β β β
β Successβ β
β βΌ β
β βββββββββββββββ β
β β Update β β
β β Checkpoint β β
β βββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Step Functions Definition:
{
"StartAt": "GetKinesisRecords",
"States": {
"GetKinesisRecords": {
"Type": "Task",
"Resource": "arn:aws:states:::kinesis:getRecords",
"Parameters": {
"ShardIterator.$": "$.shard_iterator",
"Limit": 100
},
"Next": "ProcessRecords"
},
"ProcessRecords": {
"Type": "Map",
"ItemsPath": "$.Records",
"MaxConcurrency": 10,
"Iterator": {
"StartAt": "ProcessSingleRecord",
"States": {
"ProcessSingleRecord": {
"Type": "Task",
"Resource": "arn:aws:lambda:process-record",
"Retry": [
{
"ErrorEquals": ["States.TaskFailed"],
"MaxAttempts": 3,
"BackoffRate": 2
}
],
"End": true
}
}
},
"Next": "Checkpoint"
},
"Checkpoint": {
"Type": "Task",
"Resource": "arn:aws:dynamodb:putItem",
"Parameters": {
"TableName": "checkpoints",
"Item": {
"stream_name": {"S": "my-stream"},
"shard_id": {"S.$": "$.shard_id"},
"sequence_number": {"S.$": "$.Records[-1].sequenceNumber"}
}
},
"End": true
}
}
}
Q24: How do you implement stream processing with schema validation?
Answer:
Schema Validation Layer:
from jsonschema import validate, ValidationError
# Define schema
transaction_schema = {
"type": "object",
"properties": {
"transaction_id": {"type": "string"},
"amount": {"type": "number", "minimum": 0},
"currency": {"type": "string", "pattern": "^[A-Z]{3}$"},
"timestamp": {"type": "integer"}
},
"required": ["transaction_id", "amount", "currency", "timestamp"]
}
class SchemaValidator:
def __init__(self, schema):
self.schema = schema
def validate(self, record):
try:
validate(instance=record, schema=self.schema)
return True, None
except ValidationError as e:
return False, str(e)
# Usage in Lambda
def lambda_handler(event, context):
validator = SchemaValidator(transaction_schema)
valid_records = []
invalid_records = []
for record in event['Records']:
data = json.loads(record['kinesis']['data'])
is_valid, error = validator.validate(data)
if is_valid:
valid_records.append(data)
else:
invalid_records.append({
'record': data,
'error': error,
'timestamp': datetime.now().isoformat()
})
# Process valid records
process_valid_records(valid_records)
# Send invalid records to DLQ
if invalid_records:
send_to_dlq(invalid_records)
Glue Schema Registry:
# Use Glue Schema Registry for schema validation
from aws_kinesis_agg.kinesis_record import KinesisRecord
def process_with_schema_validation(record):
# Get schema from registry
schema = get_schema_from_registry(record['schema_version'])
# Validate
if not validate_against_schema(record['data'], schema):
raise SchemaValidationError("Record doesn't match schema")
# Process
return process_record(record)
Q25: Design a cost-optimized streaming architecture.
Answer:
Cost Optimization Strategies:
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Cost-Optimized Streaming Architecture β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β Tier 1: Hot Path (Real-Time) β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β’ Kinesis Data Streams (shard optimization) β β
β β β’ Lambda (right-sized memory) β β
β β β’ DynamoDB (on-demand for variable workloads) β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β Tier 2: Warm Path (Near Real-Time) β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β’ Kinesis Data Firehose (batching) β β
β β β’ S3 (Standard β IA after 30 days) β β
β β β’ Athena (pay-per-query) β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β Tier 3: Cold Path (Batch) β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β’ S3 Glacier (archival) β β
β β β’ EMR Spot instances β β
β β β’ Redshift Reserved instances β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Cost Calculation Example:
# Estimate streaming costs
def estimate_monthly_cost(
records_per_second,
average_record_size_kb,
retention_hours
):
# Kinesis costs
shard_count = max(1, int((records_per_second * average_record_size_kb * 1024) / (1024 * 1024)))
kinesis_cost = shard_count * 0.015 * 730 # hours in month
# Lambda costs
monthly_requests = records_per_second * 86400 * 30
lambda_cost = (monthly_requests / 1000000) * 0.20
# DynamoDB costs (if using for state)
write_units = records_per_second * 86400 * 30
dynamodb_cost = (write_units / 1000000) * 1.25
return {
'kinesis': kinesis_cost,
'lambda': lambda_cost,
'dynamodb': dynamodb_cost,
'total': kinesis_cost + lambda_cost + dynamodb_cost
}
Shard Optimization:
# Optimize shard count based on throughput
def optimize_shards(records_per_second, avg_record_size_bytes):
# Calculate required throughput
required_throughput_mb = (records_per_second * avg_record_size_bytes) / (1024 * 1024)
# Each shard handles 1MB/s write, 2MB/s read
write_shards = int(required_throughput_mb) + 1
# Consider read throughput
read_shards = int(required_throughput_mb / 2) + 1
return max(write_shards, read_shards)
βΉοΈ
Key Interview Point: Always discuss cost optimization in streaming. Mention shard splitting/merging for Kinesis, reserved capacity for DynamoDB, and S3 lifecycle policies for data tiering.
Summary
Mastering AWS streaming analytics requires understanding:
- Service Selection: Kinesis vs MSK vs Firehose based on use case
- Processing Patterns: Exactly-once, windowing, state management
- Architecture Design: Multi-region, cost optimization, monitoring
- Data Quality: Schema validation, deduplication, late data handling
- Real-Time ML: Feature stores, anomaly detection, online inference
These concepts form the foundation for building scalable, reliable, and cost-effective real-time data processing systems on AWS.