Snowflake Streaming Data Ingestion
Snowflake supports real-time data ingestion through Snowpipe Streaming, enabling low-latency data delivery for time-sensitive analytics and operational workloads.
Architecture Overview
<svg width="800" height="450" viewBox="0 0 800 450" xmlns="http://www.w3.org/2000/svg">
<defs>
<linearGradient id="streamGrad" x1="0%" y1="0%" x2="100%" y2="0%">
<stop offset="0%" style="stop-color:#FF6B6B;stop-opacity:1" />
<stop offset="100%" style="stop-color:#FF8E8E;stop-opacity:1" />
</linearGradient>
<linearGradient id="pipeGrad" x1="0%" y1="0%" x2="100%" y2="0%">
<stop offset="0%" style="stop-color:#4ECDC4;stop-opacity:1" />
<stop offset="100%" style="stop-color:#7EDDD6;stop-opacity:1" />
</linearGradient>
</defs>
<text x="400" y="30" text-anchor="middle" font-size="18" font-weight="bold" fill="#333">Snowflake Streaming Ingestion Architecture</text>
<rect x="30" y="60" width="150" height="120" rx="10" fill="#6C5CE7" opacity="0.9"/>
<text x="105" y="85" text-anchor="middle" font-size="12" fill="white" font-weight="bold">Data Sources</text>
<text x="105" y="110" text-anchor="middle" font-size="10" fill="white">IoT Sensors</text>
<text x="105" y="125" text-anchor="middle" font-size="10" fill="white">App Events</text>
<text x="105" y="140" text-anchor="middle" font-size="10" fill="white">Logs</text>
<text x="105" y="155" text-anchor="middle" font-size="10" fill="white">Transactions</text>
<path d="M180 120 L230 120" stroke="#333" stroke-width="2" fill="none" marker-end="url(#arrowStream)"/>
<rect x="230" y="60" width="150" height="120" rx="10" fill="url(#streamGrad)" opacity="0.9"/>
<text x="305" y="85" text-anchor="middle" font-size="12" fill="white" font-weight="bold">Message Queue</text>
<text x="305" y="110" text-anchor="middle" font-size="10" fill="white">Kafka</text>
<text x="305" y="125" text-anchor="middle" font-size="10" fill="white">Kinesis</text>
<text x="305" y="140" text-anchor="middle" font-size="10" fill="white">Event Hub</text>
<text x="305" y="155" text-anchor="middle" font-size="10" fill="white">Pub/Sub</text>
<path d="M380 120 L430 120" stroke="#333" stroke-width="2" fill="none" marker-end="url(#arrowStream)"/>
<rect x="430" y="60" width="150" height="120" rx="10" fill="url(#pipeGrad)" opacity="0.9"/>
<text x="505" y="85" text-anchor="middle" font-size="12" fill="white" font-weight="bold">Connector Layer</text>
<text x="505" y="110" text-anchor="middle" font-size="10" fill="white">Snowpipe Streaming</text>
<text x="505" y="125" text-anchor="middle" font-size="10" fill="white">Kafka Connector</text>
<text x="505" y="140" text-anchor="middle" font-size="10" fill="white">REST API</text>
<text x="505" y="155" text-anchor="middle" font-size="10" fill="white">SDK</text>
<path d="M580 120 L630 120" stroke="#333" stroke-width="2" fill="none" marker-end="url(#arrowStream)"/>
<rect x="630" y="60" width="140" height="120" rx="10" fill="#29B5E8" opacity="0.9"/>
<text x="700" y="85" text-anchor="middle" font-size="12" fill="white" font-weight="bold">Snowflake</text>
<text x="700" y="110" text-anchor="middle" font-size="10" fill="white">Staging Table</text>
<text x="700" y="125" text-anchor="middle" font-size="10" fill="white">Snowpipe</text>
<text x="700" y:="140" text-anchor="middle" font-size="10" fill="white">Target Table</text>
<text x="700" y="155" text-anchor="middle" font-size="10" fill="white">Streams</text>
<rect x="230" y="220" width="540" height="80" rx="10" fill="#F39C12" opacity="0.9"/>
<text x="500" y="245" text-anchor="middle" font-size="14" fill="white" font-weight="bold">Streaming Latency Targets</text>
<text x="350" y="275" text-anchor="middle" font-size="11" fill="white">Real-time: <1s</text>
<text x="500" y="275" text-anchor="middle" font-size="11" fill="white">Near-real-time: 1-5s</text>
<text x="650" y="275" text-anchor="middle" font-size="11" fill="white">Micro-batch: 5-60s</text>
<rect x="30" y="330" width="180" height="90" rx="10" fill="#27AE60" opacity="0.85"/>
<text x="120" y="355" text-anchor="middle" font-size="12" fill="white" font-weight="bold">Benefits</text>
<text x="120" y="375" text-anchor="middle" font-size="10" fill="white">Low latency ingestion</text>
<text x="120" y="390" text-anchor="middle" font-size="10" fill="white">No file staging needed</text>
<text x="120" y="405" text-anchor="middle" font-size="10" fill="white">Real-time analytics</text>
<rect x="230" y="330" width="180" height="90" rx="10" fill="#8E44AD" opacity="0.85"/>
<text x="320" y="355" text-anchor="middle" font-size="12" fill="white" font-weight="bold">Features</text>
<text x="320" y="375" text-anchor="middle" font-size="10" fill="white">Exactly-once delivery</text>
<text x="320" y="390" text-anchor="middle" font-size="10" fill="white">Schema evolution</text>
<text x="320" y="405" text-anchor="middle" font-size="10" fill="white">Auto-scaling</text>
<rect x="430" y="330" width="180" height="90" rx="10" fill="#E74C3C" opacity="0.85"/>
<text x="520" y="355" text-anchor="middle" font-size="12" fill="white" font-weight="bold">Use Cases</text>
<text x="520" y="375" text-anchor="middle" font-size="10" fill="white">IoT analytics</text>
<text x="520" y="390" text-anchor="middle" font-size="10" fill="white">Fraud detection</text>
<text x="520" y="405" text-anchor="middle" font-size="10" fill="white">Real-time dashboards</text>
<defs>
<marker id="arrowStream" markerWidth="10" markerHeight="10" refX="9" refY="3" orient="auto" markerUnits="strokeWidth">
<path d="M0,0 L0,6 L9,3 z" fill="#333"/>
</marker>
</defs>
</svg>
Snowpipe Streaming
REST API Setup
-- Create table for streaming
CREATE TABLE streaming_events (
event_id VARCHAR(36),
event_type VARCHAR(50),
user_id INT,
event_timestamp TIMESTAMP_NTZ,
payload VARIANT,
ingestion_time TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()
);
-- Create pipe for auto-ingestion
CREATE OR REPLACE PIPE streaming_pipe
AUTO_INGEST = TRUE
AS
COPY INTO streaming_events
FROM @streaming_stage
FILE_FORMAT = (TYPE = 'JSON' STRIP_OUTER_ARRAY = TRUE)
ON_ERROR = 'CONTINUE';
Kafka Connector Configuration
{
"name": "snowflake-connector",
"config": {
"connector.class": "com.snowflake.kafka.connector.SnowflakeSinkConnector",
"snowflake.ingestion.method": "SNOWPIPE_STREAMING",
"snowflake.database": "my_database",
"snowflake.schema": "streaming_schema",
"snowflake.table": "streaming_events",
"snowflake.role": "streaming_role",
"snowflake.auth.type": "SNOWFLAKE_JWT",
"snowflake.private.key": "${SNOWFLAKE_PRIVATE_KEY}",
"topics": "events-topic",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter"
}
}
Performance Tuning
DfStreaming Throughput
DfEnd-to-End Latency
Channel Management
-- Check streaming channels
SELECT *
FROM TABLE(INFORMATION_SCHEMA.STREAMING_INGESTION_CHANNELS(
DATABASE_NAME => 'my_database',
SCHEMA_NAME => 'streaming_schema'
));
-- Monitor channel health
SELECT
channel_name,
channel_state,
earliest_commit_ts,
latest_commit_ts,
rows_inserted,
bytes_inserted
FROM TABLE(INFORMATION_SCHEMA.STREAMING_INGESTION_CHANNELS(
DATABASE_NAME => 'my_database',
SCHEMA_NAME => 'streaming_schema'
));
Batch vs Streaming Comparison
| Feature | Batch (Snowpipe) | Streaming | Hybrid |
|---|---|---|---|
| Latency | 1-5 minutes | <1 second | 1-60 seconds |
| Throughput | Very High | High | Medium-High |
| Cost | Lower | Higher | Medium |
| Complexity | Low | High | Medium |
| Use Case | Historical loads | Real-time | Operational |
For streaming workloads, monitor channel health regularly. Unhealthy channels can cause data delays. Set up alerts for channel lag exceeding your SLA thresholds.
Exactly-Once Delivery
-- Idempotent upsert pattern
MERGE INTO target_table t
USING staging_table s
ON t.event_id = s.event_id
WHEN MATCHED AND s.ingestion_time > t.ingestion_time THEN
UPDATE SET
t.payload = s.payload,
t.ingestion_time = s.ingestion_time
WHEN NOT MATCHED THEN
INSERT (event_id, event_type, user_id, event_timestamp, payload, ingestion_time)
VALUES (s.event_id, s.event_type, s.user_id, s.event_timestamp, s.payload, s.ingestion_time);
Monitoring Dashboard
-- Real-time ingestion metrics
SELECT
DATE_TRUNC('minute', ingestion_time) as minute,
COUNT(*) as rows_ingested,
COUNT(DISTINCT event_type) as event_types,
AVG(processing_time_ms) as avg_latency_ms
FROM streaming_events
WHERE ingestion_time >= DATEADD(hour, -1, CURRENT_TIMESTAMP())
GROUP BY 1
ORDER BY 1 DESC;
- Snowpipe Streaming enables sub-second latency ingestion
- Kafka connector provides production-grade streaming integration
- Channel monitoring is critical for streaming health
- Exactly-once semantics require idempotent patterns
- Use hybrid approaches for balanced latency and throughput