Snowflake Real-Time Data Ingestion
Snowflake provides multiple mechanisms for real-time data ingestion, enabling near-instantaneous data availability for analytics and operations.
Snowpipe for Continuous Ingestion
Setting Up Snowpipe
-- Create file format for streaming
CREATE OR REPLACE FILE FORMAT json_format
TYPE = 'JSON'
STRIP_OUTER_ARRAY = TRUE;
-- Create internal stage
CREATE OR REPLACE STAGE streaming_stage
FILE_FORMAT = json_format;
-- Create Snowpipe
CREATE OR REPLACE PIPE my_pipe
AUTO_INGEST = TRUE
COMMENT = 'Real-time data ingestion pipe'
AS
COPY INTO raw_events
FROM @streaming_stage
FILE_FORMAT = json_format
ON_ERROR = CONTINUE;
-- Get pipe ARN for cloud notification
SHOW PIPES;
-- Check pipe status
SELECT
pipe_name,
pipe_status,
last_load_time,
total_file_count
FROM TABLE(INFORMATION_SCHEMA.PIPE_USAGE_HISTORY(
PIPE_NAME => 'MY_PIPE',
START_TIME => DATEADD('hour', -24, CURRENT_TIMESTAMP())
));
Snowpipe Performance Tuning
-- Monitor pipe metrics
SELECT
pipe_name,
bytes_loaded,
files_loaded,
files_queued,
load_start_time,
load_end_time
FROM TABLE(INFORMATION_SCHEMA.COPY_USAGE_HISTORY(
START_TIME => DATEADD('hour', -1, CURRENT_TIMESTAMP())
))
WHERE pipe_name = 'MY_PIPE';
-- Optimize pipe settings
ALTER PIPE my_pipe SET
ERROR_ON_COLUMN_COUNT_MISMATCH = FALSE;
Apache Kafka Integration
Kafka Connector Setup
-- Create Kafka topic connector
CREATE OR REPLACE STREAMING INTEGRATION kafka_connector
TYPE = KAFKA
DIRECTION = INBOUND
ENABLED = TRUE
KAFKA_BROKER = 'kafka-broker:9092'
KAFKA_TOPIC = 'events_topic'
KAFKA_CONSUMER_GROUP = 'snowflake_consumer'
KAFKA_SECURITY_PROTOCOL = 'SSL'
KAFKA_SSL_CERTIFICATE_FILE = '/path/to/cert.pem'
KAFKA_SSL_KEY_FILE = '/path/to/key.pem';
-- Create target table
CREATE TABLE kafka_events (
event_id STRING,
event_type STRING,
event_time TIMESTAMP_NTZ,
payload VARIANT
);
-- Configure auto-ingest from Kafka
ALTER INTEGRATION kafka_connector SET
KAFKA_AUTO_OFFSET_RESET = LATEST;
Kafka Data Processing
-- Create stream on Kafka table
CREATE STREAM kafka_event_stream
ON TABLE kafka_events
APPEND_ONLY = FALSE;
-- Process Kafka events in real-time
CREATE OR REPLACE TASK process_kafka_events
WAREHOUSE = 'COMPUTE_WH'
SCHEDULE = 'USING CRON * * * * * America/New_York'
AS
BEGIN
-- Process new events
INSERT INTO processed_events (
event_id,
event_type,
event_time,
processed_at
)
SELECT
event_id,
event_type,
event_time,
CURRENT_TIMESTAMP()
FROM kafka_event_stream
WHERE METADATA$ACTION = 'INSERT';
-- Update aggregations
MERGE INTO hourly_metrics AS target
USING (
SELECT
DATE_TRUNC('hour', event_time) as event_hour,
event_type,
COUNT(*) as event_count
FROM kafka_event_stream
WHERE METADATA$ACTION = 'INSERT'
GROUP BY 1, 2
) AS source
ON target.event_hour = source.event_hour
AND target.event_type = source.event_type
WHEN MATCHED THEN
UPDATE SET event_count = target.event_count + source.event_count
WHEN NOT MATCHED THEN
INSERT (event_hour, event_type, event_count)
VALUES (source.event_hour, source.event_type, source.event_count);
END;
Streams for Real-Time Tracking
Creating Streams
-- Create stream on source table
CREATE STREAM order_changes
ON TABLE orders
APPEND_ONLY = FALSE
SHOW_INITIAL_ROWS = FALSE;
-- Create stream with specific columns
CREATE STREAM customer_changes
ON TABLE customers
INSERT = TRUE
UPDATE = TRUE
DELETE = TRUE
SHOW_INITIAL_ROWS = FALSE;
-- Query stream data
SELECT
*,
METADATA$ACTION as change_action,
METADATA$ISUPDATE as is_update,
METADATA$ROW_ID as row_id
FROM order_changes
WHERE METADATA$ACTION = 'INSERT';
Processing Stream Data
-- Process stream with task
CREATE OR REPLACE TASK sync_orders
WAREHOUSE = 'COMPUTE_WH'
SCHEDULE = 'USING CRON * * * * * America/New_York'
AS
BEGIN
-- Handle inserts and updates
MERGE INTO orders_archive AS target
USING (
SELECT
order_id,
customer_id,
order_date,
amount,
METADATA$ACTION as action
FROM order_changes
) AS source
ON target.order_id = source.order_id
WHEN MATCHED AND source.action = 'UPDATE' THEN
UPDATE SET
customer_id = source.customer_id,
order_date = source.order_date,
amount = source.amount,
updated_at = CURRENT_TIMESTAMP()
WHEN NOT MATCHED AND source.action = 'INSERT' THEN
INSERT (order_id, customer_id, order_date, amount, created_at)
VALUES (source.order_id, source.customer_id, source.order_date, source.amount, CURRENT_TIMESTAMP());
-- Handle deletes (soft delete)
UPDATE orders_archive
SET deleted_at = CURRENT_TIMESTAMP()
WHERE order_id IN (
SELECT order_id FROM order_changes
WHERE METADATA$ACTION = 'DELETE'
);
END;
Streams track changes at the micro-partition level, making them highly efficient for real-time data capture. They maintain change history for a configurable retention period.
Real-Time Analytics
Real-Time Dashboards
-- Create real-time metrics view
CREATE OR REPLACE SECURE VIEW realtime_dashboard AS
SELECT
DATE_TRUNC('minute', event_time) as event_minute,
event_type,
COUNT(*) as event_count,
COUNT(DISTINCT user_id) as unique_users
FROM events
WHERE event_time >= DATEADD('hour', -1, CURRENT_TIMESTAMP())
GROUP BY 1, 2;
-- Create real-time aggregation task
CREATE OR REPLACE TASK realtime_aggregation
WAREHOUSE = 'COMPUTE_WH'
SCHEDULE = 'USING CRON * * * * * America/New_York'
AS
INSERT INTO realtime_metrics (
metric_time,
metric_name,
metric_value
)
SELECT
CURRENT_TIMESTAMP() as metric_time,
'events_per_minute' as metric_name,
COUNT(*) as metric_value
FROM events
WHERE event_time >= DATEADD('minute', -1, CURRENT_TIMESTAMP());
Window-Based Aggregations
-- Create sliding window aggregation
CREATE OR REPLACE VIEW sliding_window_metrics AS
SELECT
event_time,
event_type,
COUNT(*) OVER (
PARTITION BY event_type
ORDER BY event_time
RANGE BETWEEN INTERVAL '5 minutes' PRECEDING AND CURRENT ROW
) as events_last_5min,
AVG(amount) OVER (
PARTITION BY event_type
ORDER BY event_time
RANGE BETWEEN INTERVAL '1 hour' PRECEDING AND CURRENT ROW
) as avg_amount_last_hour
FROM events;
-- Create tumbling window aggregation
SELECT
DATE_TRUNC('hour', event_time) as window_start,
DATEADD('hour', 1, DATE_TRUNC('hour', event_time)) as window_end,
event_type,
COUNT(*) as event_count,
SUM(amount) as total_amount
FROM events
GROUP BY 1, 2, 3;
Performance Optimization
-- Monitor real-time pipeline performance
SELECT
query_id,
query_text,
execution_time_ms,
rows_produced,
bytes_scanned
FROM TABLE(INFORMATION_SCHEMA.QUERY_HISTORY(
START_TIME => DATEADD('hour', -1, CURRENT_TIMESTAMP())
))
WHERE query_text LIKE '%STREAM%' OR query_text LIKE '%TASK%'
ORDER BY execution_time_ms DESC;
-- Optimize warehouse for streaming
ALTER WAREHOUSE streaming_wh SET
MIN_CLUSTER_COUNT = 1,
MAX_CLUSTER_COUNT = 4,
SCALING_POLICY = 'ECONOMY',
AUTO_SUSPEND = 60;
-- Monitor stream lag
SELECT
stream_name,
earliest_time,
latest_time,
TIMESTAMPDIFF('minute', earliest_time, latest_time) as lag_minutes
FROM TABLE(INFORMATION_SCHEMA.STREAM_USAGE_HISTORY(
START_TIME => DATEADD('hour', -1, CURRENT_TIMESTAMP())
));
Real-Time Ingestion Best Practices
| Pattern | Use Case | Latency | Throughput |
|---|---|---|---|
| Snowpipe | File-based ingestion | < 1 min | High |
| Kafka Connector | Event streaming | < 5 sec | Very High |
| Streams + Tasks | CDC processing | < 1 min | Medium |
| External Functions | API integration | < 10 sec | Low |
For sub-second latency requirements, consider using Snowflake's External Functions with event-driven architectures. For most real-time analytics use cases, Snowpipe and Streams provide sufficient performance.
Key Takeaways:
- Snowpipe provides continuous, automated file-based ingestion
- Kafka integration enables event-driven data streaming
- Streams track changes for real-time CDC processing
- Tasks orchestrate real-time data transformations
- Window functions enable real-time analytics
- Performance optimization through warehouse scaling