Kinesis Architecture Overview
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β KINESIS STREAMING ARCHITECTURE β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β DATA SOURCES β β
β β ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ β β
β β β IoT β β Mobile β β Web β β Logs β β β
β β β Devices β β Apps β β Apps β β Files β β β
β β ββββββ¬ββββββ ββββββ¬ββββββ ββββββ¬ββββββ ββββββ¬ββββββ β β
β βββββββββΌβββββββββββββββΌβββββββββββββββΌβββββββββββββββΌββββββββββββββββ β
β βΌ βΌ βΌ βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β KINESIS DATA STREAMS (Shard-based, ordered per shard) β β
β β Shard 0 (1MB/s in, 2MB/s out) Shard 1 (1MB/s in, 2MB/s out) β β
β β Retention: 24 hrs (up to 365 days) β β
β βββββββββββββββββββββββββββββββ¬ββββββββββββββββββββββββββββββββββββββββ β
β βββββββββββββββββββΌββββββββββββββββββ β
β βΌ βΌ βΌ β
β βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ β
β β Firehose β β Analytics β β Data Streams β β
β β (Delivery) β β (SQL/Flink) β β (Custom Apps) β β
β ββββββββββ¬βββββββββ ββββββββββ¬βββββββββ ββββββββββ¬βββββββββ β
β βΌ βΌ βΌ β
β βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ β
β β S3/Redshift/ES β β Lambda/S3 β β Custom Consumersβ β
β βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Kinesis Data Streams
Stream Configuration
import boto3
kinesis = boto3.client('kinesis')
# Create with provisioned shards
response = kinesis.create_stream(
StreamName='sensor-data-stream',
ShardCount=4,
StreamModeDetails={'StreamMode': 'PROVISIONED'}
)
# Or create on-demand (auto-scaling)
kinesis.create_stream(
StreamName='sensor-data-stream',
StreamModeDetails={'StreamMode': 'ON_DEMAND'}
)
Producer Example
import json
from datetime import datetime
def put_records(stream_name, records):
response = kinesis.put_records(
StreamName=stream_name,
Records=[
{'Data': json.dumps(r), 'PartitionKey': r['sensor_id']}
for r in records
]
)
if response['FailedRecordCount'] > 0:
print(f"Failed: {response['FailedRecordCount']} records")
return response
KCL Consumer
from amazon_kclpy import processor, kcl
import json
class RecordProcessor(processor.RecordProcessorBase):
def process_records(self, records):
for record in records:
data = json.loads(record.binary_data)
print(f"Processing: {data['sensor_id']}")
if __name__ == '__main__':
kcl.run(RecordProcessor())
βΉοΈ
Pro Tip: Use ON_DEMAND mode for unpredictable traffic. Use PROVISIONED for predictable workloads to control costs.
Kinesis Firehose
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β KINESIS FIREHOSE ARCHITECTURE β
β β
β Stream/Direct βββΊ Firehose βββΊ Lambda Transform βββΊ Destination β
β β
β DESTINATIONS: β
β ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ β
β β S3 β β Redshift β β Elastic β β HTTP β β
β β β β (COPY) β β search β β Endpoint β β
β ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ β
β β
β BUFFERING: Size 1-128MB, Interval 60-900s (default: 5MB/300s) β
β COMPRESSION: GZIP, Snappy, Zip β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Firehose to S3 with Partitioning
firehose = boto3.client('firehose')
response = firehose.create_delivery_stream(
DeliveryStreamName='sensor-data-firehose',
DeliveryStreamType='KinesisStreamAsSource',
KinesisStreamSourceConfiguration={
'KinesisStreamARN': 'arn:aws:kinesis:us-east-1:123456789012:stream/sensor-data-stream',
'RoleARN': 'arn:aws:iam::123456789012:role/FirehoseRole'
},
ExtendedS3DestinationConfiguration={
'RoleARN': 'arn:aws:iam::123456789012:role/FirehoseRole',
'BucketARN': 'arn:aws:s3:::data-lake-raw',
'Prefix': 'sensor-data/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/',
'ErrorOutputPrefix': 'errors/',
'BufferingHints': {'SizeInMBs': 64, 'IntervalInSeconds': 300},
'CompressionFormat': 'UNCOMPRESSED',
'ProcessingConfiguration': {
'Enabled': True,
'Processors': [{
'Type': 'Lambda',
'Parameters': [{'ParameterName': 'LambdaArn', 'ParameterValue': 'arn:aws:lambda:...'}]
}]
}
}
)
Kinesis Analytics (SQL & Flink)
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β KINESIS ANALYTICS ARCHITECTURE β
β β
β Source Stream βββΊ Application βββΊ Output β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β SQL APPLICATION β β
β β β β
β β CREATE STREAM "sensor_stream" ( β β
β β sensor_id VARCHAR(16), β β
β β temperature DOUBLE, β β
β β event_time TIMESTAMP β β
β β ); β β
β β β β
β β CREATE OR REPLACE PUMP "stream_pump" AS β β
β β INSERT INTO "output_stream" β β
β β SELECT * FROM "sensor_stream" β β
β β WHERE temperature > 50.0; β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β FLINK APPLICATION β β
β β β β
β β β’ Apache Flink runtime β β
β β β’ Java/Scala/Python support β β
β β β’ Window operations (tumbling, sliding, session) β β
β β β’ State management β β
β β β’ Exactly-once processing β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Kinesis Analytics SQL Example
-- Real-time aggregation
CREATE OR REPLACE STREAM "aggregated_stream" (
sensor_id VARCHAR(16),
avg_temperature DOUBLE,
max_temperature DOUBLE,
min_temperature DOUBLE,
reading_count INTEGER,
window_start TIMESTAMP
);
CREATE OR REPLACE PUMP "aggregate_pump" AS
INSERT INTO "aggregated_stream"
SELECT
sensor_id,
AVG(temperature) AS avg_temperature,
MAX(temperature) AS max_temperature,
MIN(temperature) AS min_temperature,
COUNT(*) AS reading_count,
STEP("sensor_stream".ROWTIME BY INTERVAL '5' MINUTE) AS window_start
FROM "sensor_stream"
GROUP BY sensor_id, STEP("sensor_stream".ROWTIME BY INTERVAL '5' MINUTE);
-- Anomaly detection
CREATE OR REPLACE STREAM "anomaly_stream" (
sensor_id VARCHAR(16),
temperature DOUBLE,
anomaly_type VARCHAR(32)
);
CREATE OR REPLACE PUMP "anomaly_pump" AS
INSERT INTO "anomaly_stream"
SELECT
sensor_id,
temperature,
'HIGH_TEMP' AS anomaly_type
FROM "sensor_stream"
WHERE temperature > (
SELECT AVG(temperature) + 3 * STDDEV(temperature)
FROM "sensor_stream"
);
Kinesis Best Practices
βΉοΈ
Pro Tip: Use Enhanced Fan-Out for dedicated throughput per consumer (2MB/s per consumer vs shared 2MB/s per shard). Ideal for multiple consumers reading the same stream.
Sharding Calculation
| Metric | Formula | Example |
|---|---|---|
| Inbound | Records/s Γ Avg size | 1000 Γ 1KB = 1MB/s |
| Outbound | Consumers Γ Records/s Γ Size | 3 Γ 500 Γ 1KB = 1.5MB/s |
| Shards needed | MAX(Inbound/1MB, Outbound/2MB) | MAX(1, 0.75) = 1 shard |
Cost Comparison
| Component | Provisioned | On-Demand |
|---|---|---|
| Per shard/hour | $0.015 | N/A |
| Per million PUT Payload Units | 0.14 | |
| Storage (beyond 24hr) | 0.023/GB/month | |
| Enhanced Fan-Out | $0.015/shard/hour | N/A |
β οΈ
Cost Warning: On-demand mode costs ~9x more per record than provisioned. Use provisioned mode when you can predict throughput requirements.
Interview Questions & Answers
Q1: What is the difference between Kinesis Data Streams and Firehose?
Answer:
- Data Streams: Real-time, custom consumers, shard-based, manual scaling
- Firehose: Managed delivery, auto-scaling, built-in destinations (S3, Redshift, ES)
Use Data Streams for custom processing. Use Firehose for simple delivery to destinations.
Q2: How do you handle data ordering in Kinesis?
Answer: Data is ordered within a shard only. Use partition keys wisely:
- Same partition key goes to same shard
- Order is guaranteed per shard, not across shards
- For global ordering: use single shard (limited to 1MB/s)
Q3: What is Enhanced Fan-Out and when should you use it?
Answer: Enhanced Fan-Out provides dedicated 2MB/s throughput per consumer instead of shared 2MB/s per shard. Use when:
- Multiple consumers read same stream
- Low-latency requirements (<70ms)
- Each consumer needs consistent throughput
Q4: How does Kinesis handle data retention?
Answer:
- Default: 24 hours
- Maximum: 365 days
- Extended retention costs $0.023/GB/month
- Use S3 for long-term storage instead
Q5: What is the difference between Provisioned and On-Demand modes?
Answer:
- Provisioned: You specify shard count, manual scaling, lower cost
- On-Demand: Auto-scaling, pay-per-use, higher cost per record
Use Provisioned for predictable workloads, On-Demand for variable/unpredictable.
Summary
Kinesis is AWS's core streaming platform. Key takeaways:
- Data Streams: Shard-based, custom consumers, ordered per shard
- Firehose: Managed delivery, auto-scaling, S3/Redshift/ES destinations
- Analytics: SQL and Flink for real-time processing
- Sharding: 1MB/s in, 2MB/s out per shard; plan sharding carefully
- Cost: Provisioned is cheaper for predictable workloads; On-Demand for variable
- Fan-Out: Use for multiple consumers needing dedicated throughput