Real-Time Pipeline: Event Hubs → Stream Analytics → Cosmos DB
End-to-end real-time streaming pipeline with Event Hubs, Stream Analytics, and Cosmos DB for sub-second analytics
Streaming Pipeline Architecture
┌─────────────────────────────────────────────────────────────────────┐
│ REAL-TIME STREAMING PIPELINE │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ DATA SOURCES INGESTION PROCESSING │
│ ┌──────────┐ ┌──────────┐ ┌──────────────┐ │
│ │ IoT │───────────>│ Event │───────>│ Stream │ │
│ │ Devices │ │ Hubs │ │ Analytics │ │
│ └──────────┘ └──────────┘ │ │ │
│ │ • Windowing │ │
│ ┌──────────┐ ┌──────────┐ │ • Joins │ │
│ │ App │───────────>│ Event │───────>│ • Aggregation│ │
│ │ Events │ │ Hubs │ │ • Filtering │ │
│ └──────────┘ └──────────┘ └──────┬───────┘ │
│ │ │
│ ┌──────────┐ ┌──────────┐ │ │
│ │ Log │───────────>│ Event │───────────────┘ │
│ │ Streams │ │ Hubs │ │
│ └──────────┘ └──────────┘ │
│ │ │
│ STORAGE VISUALIZATION ALERTING │
│ ▼ │
│ ┌──────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Cosmos │<────────│ Power BI │ │ Azure │ │
│ │ DB │ │ Dashboard │ │ Functions │ │
│ └──────────┘ └──────────────┘ │ (Alerts) │ │
│ │ └──────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────┐ ┌──────────────┐ │
│ │ ADLS │<────────│ Event Hubs │ │
│ │ Gen2 │ │ Capture │ │
│ └──────────┘ └──────────────┘ │
└─────────────────────────────────────────────────────────────────────┘
Event Hubs Configuration
{
"properties": {
"eventHubName": "iot-telemetry",
"partitionCount": 16,
"messageRetentionInDays": 7,
"captureDescription": {
"enabled": true,
"encoding": "Parquet",
"destination": {
"properties": {
"storageAccountResourceId": "/subscriptions/xxx/resourceGroups/rg/providers/Microsoft.Storage/storageAccounts/stdatalake001",
"blobContainer": "event-capture",
"archiveNameFormat": "{Namespace}/{EventHub}/{PartitionId}/{Year}/{Month}/{Day}/{Hour}/{Minute}/{Second}",
"timeWindow": "00:05:00",
"sizeLimitInBytes": 104857600
}
}
}
}
}
Stream Analytics Query
-- Real-time aggregation with windowing
SELECT
System.Timestamp() AS window_end,
device_id,
COUNT(*) AS event_count,
AVG(temperature) AS avg_temperature,
MAX(temperature) AS max_temperature,
MIN(humidity) AS min_humidity
INTO [cosmos-output]
FROM [eventhub-input] PARTITION BY device_id
GROUP BY
TumblingWindow(minute, 1),
device_id;
-- Anomaly detection
SELECT
device_id,
System.Timestamp() AS event_time,
temperature,
AnomalyDetection_SpikeAndDip(
temperature, 95, 100,
DATEDIFF(second, System.Timestamp(), event_time)
) AS spike_result
INTO [alert-output]
FROM [eventhub-input] PARTITION BY device_id
GROUP BY
TumblingWindow(second, 30),
device_id;
-- Temporal join with reference data
SELECT
e.device_id,
e.temperature,
e.event_time,
r.location,
r.threshold_min,
r.threshold_max
INTO [enriched-output]
FROM [eventhub-input] e
JOIN [device-reference] r
ON e.device_id = r.device_id
AND DATEDIFF(minute, e, r) BETWEEN -5 AND 5;
Python Producer
from azure.eventhub import EventHubProducerClient, EventData
import json
import time
import random
producer = EventHubProducerClient.from_connection_string(
conn_str="Endpoint=sb://ns-iot.servicebus.windows.net/;SharedAccessKeyName=...;",
eventhub_name="iot-telemetry"
)
while True:
events = []
for i in range(100):
event = {
"device_id": f"sensor-{random.randint(1, 1000):04d}",
"temperature": round(random.uniform(60, 100), 2),
"humidity": round(random.uniform(30, 80), 2),
"timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ")
}
events.append(EventData(json.dumps(event)))
producer.send_batch(events)
time.sleep(1)
ℹ️
Pro Tip: Use partition keys wisely—partition by device_id ensures ordering per device. For high-cardinality keys, monitor partition distribution to avoid hot partitions.
Interview Questions
Q1: How do you ensure exactly-once processing in this streaming pipeline? A: Event Hubs provides checkpointing via consumer groups. Stream Analytics uses checkpointing to resume from last position. For Cosmos DB, use upsert operations with unique keys to handle duplicates.
Q2: What are the scaling limits for this architecture? A: Event Hubs: 40 TU (Standard) = 40 MB/s ingress. Stream Analytics: 120 SU = 120 MB/s. Cosmos DB: Unlimited RU/s with autoscale. Scale each component independently based on throughput requirements.
Q3: How do you handle backpressure in the streaming pipeline? A: Event Hubs buffers events (7-day retention). Stream Analytics processes at its pace (configure SU). If processing falls behind, events accumulate in Event Hubs. Monitor lag metrics and scale SU as needed.