Streaming Pipeline Architecture
Architecture Diagram
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β REAL-TIME STREAMING PIPELINE: Kinesis β Lambda β S3 β Athena β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β DATA SOURCES (Producers) β β
β β ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ β β
β β β IoT β β Web Apps β β Mobile β β Logs β β β
β β β Sensors β β Events β β Events β β Streams β β β
β β ββββββ¬ββββββ ββββββ¬ββββββ ββββββ¬ββββββ ββββββ¬ββββββ β β
β βββββββββΌβββββββββββββββΌβββββββββββββββΌβββββββββββββββΌββββββββββββββββ β
β βΌ βΌ βΌ βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β KINESIS DATA STREAMS (Ingestion) β β
β β β β
β β Shard 0 Shard 1 Shard 2 Shard 3 β β
β β ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ β β
β β β 1MB/s in β β 1MB/s in β β 1MB/s in β β 1MB/s in β β β
β β β 2MB/s outβ β 2MB/s outβ β 2MB/s outβ β 2MB/s outβ β β
β β ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ β β
β β β β
β β Retention: 24 hours (extendable to 365) β β
β βββββββββββββββββββββββββββββββ¬ββββββββββββββββββββββββββββββββββββββββ β
β β β
β βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β LAMBDA (Processing) β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Event Source Mapping β β β
β β β β’ Batch Size: 100-1000 records β β β
β β β β’ Batch Window: 60 seconds β β β
β β β β’ Parallelization Factor: 10 β β β
β β β β’ Maximum Batching: Enabled β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Processing Logic β β β
β β β β’ Parse and validate records β β β
β β β β’ Enrich with reference data β β β
β β β β’ Aggregate (windowed) β β β
β β β β’ Filter invalid records β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββ¬ββββββββββββββββββββββββββββββββββββββββ β
β β β
β βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β S3 (Storage) β β
β β β β
β β s3://realtime-data/ β β
β β βββ raw/ (Ingested records) β β
β β β βββ {date}/{hour}/ (Hourly partitions) β β
β β βββ processed/ (Transformed) β β
β β β βββ {date}/{hour}/ (Hourly partitions) β β
β β βββ aggregated/ (Windowed aggregates) β β
β β βββ {date}/{hour}/ (Hourly aggregates) β β
β β β β
β β Format: Parquet (compressed) β β
β βββββββββββββββββββββββββββββββ¬ββββββββββββββββββββββββββββββββββββββββ β
β β β
β βββββββββββββββββββΌββββββββββββββββββ β
β βΌ βΌ βΌ β
β βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ β
β β Athena β β QuickSight β β Lambda β β
β β (Ad-hoc) β β (Dashboards) β β (Alerts) β β
β βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β MONITORING β β
β β CloudWatch Metrics: IteratorAge, GetRecords.IteratorAgeMillisecondsβ β
β β CloudWatch Alarms: Throttles, Errors, IteratorAge > threshold β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Lambda Processing Code
import json
import base64
import boto3
from datetime import datetime
s3 = boto3.client('s3')
def lambda_handler(event, context):
"""Process Kinesis records and write to S3."""
processed_records = []
for record in event['Records']:
# Decode Kinesis data
payload = base64.b64decode(record['kinesis']['data'])
data = json.loads(payload)
# Process and transform
processed = process_record(data)
processed_records.append(processed)
# Write batch to S3
if processed_records:
write_to_s3(processed_records)
return {'processed': len(processed_records)}
def process_record(data):
"""Transform and enrich record."""
return {
'event_id': data.get('event_id'),
'user_id': data.get('user_id'),
'event_type': data.get('event_type'),
'timestamp': datetime.now().isoformat(),
'processed_at': datetime.now().isoformat()
}
def write_to_s3(records):
"""Write batch of records to S3 as Parquet."""
import pandas as pd
from io import BytesIO
df = pd.DataFrame(records)
# Partition by date and hour
now = datetime.now()
key = f"processed/year={now.year}/month={now.month:02d}/day={now.day:02d}/hour={now.hour:02d}/{now.strftime('%Y%m%d%H%M%S')}.parquet"
buffer = BytesIO()
df.to_parquet(buffer, index=False, compression='snappy')
s3.put_object(
Bucket='realtime-data-lake',
Key=key,
Body=buffer.getvalue()
)
Interview Q&A
Q1: What is IteratorAge in Kinesis?
Answer: IteratorAge measures the lag between the latest record in the stream and the last record processed by the consumer. High iterator age indicates the consumer is falling behind.
Q2: How do you handle backpressure in streaming?
Answer: Use batch window configuration, increase parallelization factor, optimize Lambda execution time, or use Kinesis Enhanced Fan-Out.
Q3: What is the difference between at-least-once and exactly-once delivery?
Answer: At-least-once may process duplicates; exactly-once processes each record once. Use idempotent processing or DynamoDB deduplication for exactly-once semantics.
Summary
- Architecture: Kinesis β Lambda β S3 β Athena/QuickSight
- Key Metrics: IteratorAge, Throttles, Errors
- Best Practices: Idempotent processing, partition by time, use Parquet
- Cost Optimization: Right-size batch size, use provisioned concurrency for critical paths