CW

Snowflake Real-Time Data Ingestion

Free Lesson

Advertisement

Snowflake Real-Time Data Ingestion

Snowflake provides multiple mechanisms for real-time data ingestion, enabling near-instantaneous data availability for analytics and operations.

Snowpipe for Continuous Ingestion

Setting Up Snowpipe

-- Create file format for streaming
CREATE OR REPLACE FILE FORMAT json_format
  TYPE = 'JSON'
  STRIP_OUTER_ARRAY = TRUE;

-- Create internal stage
CREATE OR REPLACE STAGE streaming_stage
  FILE_FORMAT = json_format;

-- Create Snowpipe
CREATE OR REPLACE PIPE my_pipe
  AUTO_INGEST = TRUE
  COMMENT = 'Real-time data ingestion pipe'
AS
  COPY INTO raw_events
  FROM @streaming_stage
  FILE_FORMAT = json_format
  ON_ERROR = CONTINUE;

-- Get pipe ARN for cloud notification
SHOW PIPES;

-- Check pipe status
SELECT
  pipe_name,
  pipe_status,
  last_load_time,
  total_file_count
FROM TABLE(INFORMATION_SCHEMA.PIPE_USAGE_HISTORY(
  PIPE_NAME => 'MY_PIPE',
  START_TIME => DATEADD('hour', -24, CURRENT_TIMESTAMP())
));

Snowpipe Performance Tuning

-- Monitor pipe metrics
SELECT
  pipe_name,
  bytes_loaded,
  files_loaded,
  files_queued,
  load_start_time,
  load_end_time
FROM TABLE(INFORMATION_SCHEMA.COPY_USAGE_HISTORY(
  START_TIME => DATEADD('hour', -1, CURRENT_TIMESTAMP())
))
WHERE pipe_name = 'MY_PIPE';

-- Optimize pipe settings
ALTER PIPE my_pipe SET
  ERROR_ON_COLUMN_COUNT_MISMATCH = FALSE;

Apache Kafka Integration

Kafka Connector Setup

-- Create Kafka topic connector
CREATE OR REPLACE STREAMING INTEGRATION kafka_connector
  TYPE = KAFKA
  DIRECTION = INBOUND
  ENABLED = TRUE
  KAFKA_BROKER = 'kafka-broker:9092'
  KAFKA_TOPIC = 'events_topic'
  KAFKA_CONSUMER_GROUP = 'snowflake_consumer'
  KAFKA_SECURITY_PROTOCOL = 'SSL'
  KAFKA_SSL_CERTIFICATE_FILE = '/path/to/cert.pem'
  KAFKA_SSL_KEY_FILE = '/path/to/key.pem';

-- Create target table
CREATE TABLE kafka_events (
  event_id STRING,
  event_type STRING,
  event_time TIMESTAMP_NTZ,
  payload VARIANT
);

-- Configure auto-ingest from Kafka
ALTER INTEGRATION kafka_connector SET
  KAFKA_AUTO_OFFSET_RESET = LATEST;

Kafka Data Processing

-- Create stream on Kafka table
CREATE STREAM kafka_event_stream
  ON TABLE kafka_events
  APPEND_ONLY = FALSE;

-- Process Kafka events in real-time
CREATE OR REPLACE TASK process_kafka_events
  WAREHOUSE = 'COMPUTE_WH'
  SCHEDULE = 'USING CRON * * * * * America/New_York'
AS
  BEGIN
    -- Process new events
    INSERT INTO processed_events (
      event_id,
      event_type,
      event_time,
      processed_at
    )
    SELECT
      event_id,
      event_type,
      event_time,
      CURRENT_TIMESTAMP()
    FROM kafka_event_stream
    WHERE METADATA$ACTION = 'INSERT';

    -- Update aggregations
    MERGE INTO hourly_metrics AS target
    USING (
      SELECT
        DATE_TRUNC('hour', event_time) as event_hour,
        event_type,
        COUNT(*) as event_count
      FROM kafka_event_stream
      WHERE METADATA$ACTION = 'INSERT'
      GROUP BY 1, 2
    ) AS source
    ON target.event_hour = source.event_hour
      AND target.event_type = source.event_type
    WHEN MATCHED THEN
      UPDATE SET event_count = target.event_count + source.event_count
    WHEN NOT MATCHED THEN
      INSERT (event_hour, event_type, event_count)
      VALUES (source.event_hour, source.event_type, source.event_count);
  END;

Streams for Real-Time Tracking

Creating Streams

-- Create stream on source table
CREATE STREAM order_changes
  ON TABLE orders
  APPEND_ONLY = FALSE
  SHOW_INITIAL_ROWS = FALSE;

-- Create stream with specific columns
CREATE STREAM customer_changes
  ON TABLE customers
  INSERT = TRUE
  UPDATE = TRUE
  DELETE = TRUE
  SHOW_INITIAL_ROWS = FALSE;

-- Query stream data
SELECT
  *,
  METADATA$ACTION as change_action,
  METADATA$ISUPDATE as is_update,
  METADATA$ROW_ID as row_id
FROM order_changes
WHERE METADATA$ACTION = 'INSERT';

Processing Stream Data

-- Process stream with task
CREATE OR REPLACE TASK sync_orders
  WAREHOUSE = 'COMPUTE_WH'
  SCHEDULE = 'USING CRON * * * * * America/New_York'
AS
  BEGIN
    -- Handle inserts and updates
    MERGE INTO orders_archive AS target
    USING (
      SELECT
        order_id,
        customer_id,
        order_date,
        amount,
        METADATA$ACTION as action
      FROM order_changes
    ) AS source
    ON target.order_id = source.order_id
    WHEN MATCHED AND source.action = 'UPDATE' THEN
      UPDATE SET
        customer_id = source.customer_id,
        order_date = source.order_date,
        amount = source.amount,
        updated_at = CURRENT_TIMESTAMP()
    WHEN NOT MATCHED AND source.action = 'INSERT' THEN
      INSERT (order_id, customer_id, order_date, amount, created_at)
      VALUES (source.order_id, source.customer_id, source.order_date, source.amount, CURRENT_TIMESTAMP());

    -- Handle deletes (soft delete)
    UPDATE orders_archive
    SET deleted_at = CURRENT_TIMESTAMP()
    WHERE order_id IN (
      SELECT order_id FROM order_changes
      WHERE METADATA$ACTION = 'DELETE'
    );
  END;

Streams track changes at the micro-partition level, making them highly efficient for real-time data capture. They maintain change history for a configurable retention period.

Real-Time Analytics

Real-Time Dashboards

-- Create real-time metrics view
CREATE OR REPLACE SECURE VIEW realtime_dashboard AS
SELECT
  DATE_TRUNC('minute', event_time) as event_minute,
  event_type,
  COUNT(*) as event_count,
  COUNT(DISTINCT user_id) as unique_users
FROM events
WHERE event_time >= DATEADD('hour', -1, CURRENT_TIMESTAMP())
GROUP BY 1, 2;

-- Create real-time aggregation task
CREATE OR REPLACE TASK realtime_aggregation
  WAREHOUSE = 'COMPUTE_WH'
  SCHEDULE = 'USING CRON * * * * * America/New_York'
AS
  INSERT INTO realtime_metrics (
    metric_time,
    metric_name,
    metric_value
  )
  SELECT
    CURRENT_TIMESTAMP() as metric_time,
    'events_per_minute' as metric_name,
    COUNT(*) as metric_value
  FROM events
  WHERE event_time >= DATEADD('minute', -1, CURRENT_TIMESTAMP());

Window-Based Aggregations

-- Create sliding window aggregation
CREATE OR REPLACE VIEW sliding_window_metrics AS
SELECT
  event_time,
  event_type,
  COUNT(*) OVER (
    PARTITION BY event_type
    ORDER BY event_time
    RANGE BETWEEN INTERVAL '5 minutes' PRECEDING AND CURRENT ROW
  ) as events_last_5min,
  AVG(amount) OVER (
    PARTITION BY event_type
    ORDER BY event_time
    RANGE BETWEEN INTERVAL '1 hour' PRECEDING AND CURRENT ROW
  ) as avg_amount_last_hour
FROM events;

-- Create tumbling window aggregation
SELECT
  DATE_TRUNC('hour', event_time) as window_start,
  DATEADD('hour', 1, DATE_TRUNC('hour', event_time)) as window_end,
  event_type,
  COUNT(*) as event_count,
  SUM(amount) as total_amount
FROM events
GROUP BY 1, 2, 3;

Performance Optimization

-- Monitor real-time pipeline performance
SELECT
  query_id,
  query_text,
  execution_time_ms,
  rows_produced,
  bytes_scanned
FROM TABLE(INFORMATION_SCHEMA.QUERY_HISTORY(
  START_TIME => DATEADD('hour', -1, CURRENT_TIMESTAMP())
))
WHERE query_text LIKE '%STREAM%' OR query_text LIKE '%TASK%'
ORDER BY execution_time_ms DESC;

-- Optimize warehouse for streaming
ALTER WAREHOUSE streaming_wh SET
  MIN_CLUSTER_COUNT = 1,
  MAX_CLUSTER_COUNT = 4,
  SCALING_POLICY = 'ECONOMY',
  AUTO_SUSPEND = 60;

-- Monitor stream lag
SELECT
  stream_name,
  earliest_time,
  latest_time,
  TIMESTAMPDIFF('minute', earliest_time, latest_time) as lag_minutes
FROM TABLE(INFORMATION_SCHEMA.STREAM_USAGE_HISTORY(
  START_TIME => DATEADD('hour', -1, CURRENT_TIMESTAMP())
));

Real-Time Ingestion Best Practices

PatternUse CaseLatencyThroughput
SnowpipeFile-based ingestion< 1 minHigh
Kafka ConnectorEvent streaming< 5 secVery High
Streams + TasksCDC processing< 1 minMedium
External FunctionsAPI integration< 10 secLow

For sub-second latency requirements, consider using Snowflake's External Functions with event-driven architectures. For most real-time analytics use cases, Snowpipe and Streams provide sufficient performance.

Key Takeaways:

  • Snowpipe provides continuous, automated file-based ingestion
  • Kafka integration enables event-driven data streaming
  • Streams track changes for real-time CDC processing
  • Tasks orchestrate real-time data transformations
  • Window functions enable real-time analytics
  • Performance optimization through warehouse scaling

Advertisement

Need Expert Snowflake Help?

Get personalized warehouse optimization, data modeling, or Snowflake platform consulting.

Advertisement