CW

Snowflake Streaming Data Ingestion

Free Lesson

Advertisement

Snowflake Streaming Data Ingestion

Snowflake supports real-time data ingestion through Snowpipe Streaming, enabling low-latency data delivery for time-sensitive analytics and operational workloads.

Architecture Overview

<svg width="800" height="450" viewBox="0 0 800 450" xmlns="http://www.w3.org/2000/svg">
  <defs>
    <linearGradient id="streamGrad" x1="0%" y1="0%" x2="100%" y2="0%">
      <stop offset="0%" style="stop-color:#FF6B6B;stop-opacity:1" />
      <stop offset="100%" style="stop-color:#FF8E8E;stop-opacity:1" />
    </linearGradient>
    <linearGradient id="pipeGrad" x1="0%" y1="0%" x2="100%" y2="0%">
      <stop offset="0%" style="stop-color:#4ECDC4;stop-opacity:1" />
      <stop offset="100%" style="stop-color:#7EDDD6;stop-opacity:1" />
    </linearGradient>
  </defs>

  <text x="400" y="30" text-anchor="middle" font-size="18" font-weight="bold" fill="#333">Snowflake Streaming Ingestion Architecture</text>
  <rect x="30" y="60" width="150" height="120" rx="10" fill="#6C5CE7" opacity="0.9"/>
  <text x="105" y="85" text-anchor="middle" font-size="12" fill="white" font-weight="bold">Data Sources</text>
  <text x="105" y="110" text-anchor="middle" font-size="10" fill="white">IoT Sensors</text>
  <text x="105" y="125" text-anchor="middle" font-size="10" fill="white">App Events</text>
  <text x="105" y="140" text-anchor="middle" font-size="10" fill="white">Logs</text>
  <text x="105" y="155" text-anchor="middle" font-size="10" fill="white">Transactions</text>
  <path d="M180 120 L230 120" stroke="#333" stroke-width="2" fill="none" marker-end="url(#arrowStream)"/>
  <rect x="230" y="60" width="150" height="120" rx="10" fill="url(#streamGrad)" opacity="0.9"/>
  <text x="305" y="85" text-anchor="middle" font-size="12" fill="white" font-weight="bold">Message Queue</text>
  <text x="305" y="110" text-anchor="middle" font-size="10" fill="white">Kafka</text>
  <text x="305" y="125" text-anchor="middle" font-size="10" fill="white">Kinesis</text>
  <text x="305" y="140" text-anchor="middle" font-size="10" fill="white">Event Hub</text>
  <text x="305" y="155" text-anchor="middle" font-size="10" fill="white">Pub/Sub</text>
  <path d="M380 120 L430 120" stroke="#333" stroke-width="2" fill="none" marker-end="url(#arrowStream)"/>
  <rect x="430" y="60" width="150" height="120" rx="10" fill="url(#pipeGrad)" opacity="0.9"/>
  <text x="505" y="85" text-anchor="middle" font-size="12" fill="white" font-weight="bold">Connector Layer</text>
  <text x="505" y="110" text-anchor="middle" font-size="10" fill="white">Snowpipe Streaming</text>
  <text x="505" y="125" text-anchor="middle" font-size="10" fill="white">Kafka Connector</text>
  <text x="505" y="140" text-anchor="middle" font-size="10" fill="white">REST API</text>
  <text x="505" y="155" text-anchor="middle" font-size="10" fill="white">SDK</text>
  <path d="M580 120 L630 120" stroke="#333" stroke-width="2" fill="none" marker-end="url(#arrowStream)"/>
  <rect x="630" y="60" width="140" height="120" rx="10" fill="#29B5E8" opacity="0.9"/>
  <text x="700" y="85" text-anchor="middle" font-size="12" fill="white" font-weight="bold">Snowflake</text>
  <text x="700" y="110" text-anchor="middle" font-size="10" fill="white">Staging Table</text>
  <text x="700" y="125" text-anchor="middle" font-size="10" fill="white">Snowpipe</text>
  <text x="700" y:="140" text-anchor="middle" font-size="10" fill="white">Target Table</text>
  <text x="700" y="155" text-anchor="middle" font-size="10" fill="white">Streams</text>
  <rect x="230" y="220" width="540" height="80" rx="10" fill="#F39C12" opacity="0.9"/>
  <text x="500" y="245" text-anchor="middle" font-size="14" fill="white" font-weight="bold">Streaming Latency Targets</text>
  <text x="350" y="275" text-anchor="middle" font-size="11" fill="white">Real-time: &lt;1s</text>
  <text x="500" y="275" text-anchor="middle" font-size="11" fill="white">Near-real-time: 1-5s</text>
  <text x="650" y="275" text-anchor="middle" font-size="11" fill="white">Micro-batch: 5-60s</text>
  <rect x="30" y="330" width="180" height="90" rx="10" fill="#27AE60" opacity="0.85"/>
  <text x="120" y="355" text-anchor="middle" font-size="12" fill="white" font-weight="bold">Benefits</text>
  <text x="120" y="375" text-anchor="middle" font-size="10" fill="white">Low latency ingestion</text>
  <text x="120" y="390" text-anchor="middle" font-size="10" fill="white">No file staging needed</text>
  <text x="120" y="405" text-anchor="middle" font-size="10" fill="white">Real-time analytics</text>

  <rect x="230" y="330" width="180" height="90" rx="10" fill="#8E44AD" opacity="0.85"/>
  <text x="320" y="355" text-anchor="middle" font-size="12" fill="white" font-weight="bold">Features</text>
  <text x="320" y="375" text-anchor="middle" font-size="10" fill="white">Exactly-once delivery</text>
  <text x="320" y="390" text-anchor="middle" font-size="10" fill="white">Schema evolution</text>
  <text x="320" y="405" text-anchor="middle" font-size="10" fill="white">Auto-scaling</text>

  <rect x="430" y="330" width="180" height="90" rx="10" fill="#E74C3C" opacity="0.85"/>
  <text x="520" y="355" text-anchor="middle" font-size="12" fill="white" font-weight="bold">Use Cases</text>
  <text x="520" y="375" text-anchor="middle" font-size="10" fill="white">IoT analytics</text>
  <text x="520" y="390" text-anchor="middle" font-size="10" fill="white">Fraud detection</text>
  <text x="520" y="405" text-anchor="middle" font-size="10" fill="white">Real-time dashboards</text>

  <defs>
    <marker id="arrowStream" markerWidth="10" markerHeight="10" refX="9" refY="3" orient="auto" markerUnits="strokeWidth">
      <path d="M0,0 L0,6 L9,3 z" fill="#333"/>
    </marker>
  </defs>
</svg>

Snowpipe Streaming

REST API Setup

-- Create table for streaming
CREATE TABLE streaming_events (
  event_id VARCHAR(36),
  event_type VARCHAR(50),
  user_id INT,
  event_timestamp TIMESTAMP_NTZ,
  payload VARIANT,
  ingestion_time TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()
);

-- Create pipe for auto-ingestion
CREATE OR REPLACE PIPE streaming_pipe
  AUTO_INGEST = TRUE
  AS
  COPY INTO streaming_events
  FROM @streaming_stage
  FILE_FORMAT = (TYPE = 'JSON' STRIP_OUTER_ARRAY = TRUE)
  ON_ERROR = 'CONTINUE';

Kafka Connector Configuration

{
  "name": "snowflake-connector",
  "config": {
    "connector.class": "com.snowflake.kafka.connector.SnowflakeSinkConnector",
    "snowflake.ingestion.method": "SNOWPIPE_STREAMING",
    "snowflake.database": "my_database",
    "snowflake.schema": "streaming_schema",
    "snowflake.table": "streaming_events",
    "snowflake.role": "streaming_role",
    "snowflake.auth.type": "SNOWFLAKE_JWT",
    "snowflake.private.key": "${SNOWFLAKE_PRIVATE_KEY}",
    "topics": "events-topic",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter"
  }
}

Performance Tuning

DfStreaming Throughput

DfEnd-to-End Latency

Channel Management

-- Check streaming channels
SELECT *
FROM TABLE(INFORMATION_SCHEMA.STREAMING_INGESTION_CHANNELS(
  DATABASE_NAME => 'my_database',
  SCHEMA_NAME => 'streaming_schema'
));

-- Monitor channel health
SELECT
  channel_name,
  channel_state,
  earliest_commit_ts,
  latest_commit_ts,
  rows_inserted,
  bytes_inserted
FROM TABLE(INFORMATION_SCHEMA.STREAMING_INGESTION_CHANNELS(
  DATABASE_NAME => 'my_database',
  SCHEMA_NAME => 'streaming_schema'
));

Batch vs Streaming Comparison

FeatureBatch (Snowpipe)StreamingHybrid
Latency1-5 minutes<1 second1-60 seconds
ThroughputVery HighHighMedium-High
CostLowerHigherMedium
ComplexityLowHighMedium
Use CaseHistorical loadsReal-timeOperational

For streaming workloads, monitor channel health regularly. Unhealthy channels can cause data delays. Set up alerts for channel lag exceeding your SLA thresholds.

Exactly-Once Delivery

-- Idempotent upsert pattern
MERGE INTO target_table t
USING staging_table s
ON t.event_id = s.event_id
WHEN MATCHED AND s.ingestion_time > t.ingestion_time THEN
  UPDATE SET
    t.payload = s.payload,
    t.ingestion_time = s.ingestion_time
WHEN NOT MATCHED THEN
  INSERT (event_id, event_type, user_id, event_timestamp, payload, ingestion_time)
  VALUES (s.event_id, s.event_type, s.user_id, s.event_timestamp, s.payload, s.ingestion_time);

Monitoring Dashboard

-- Real-time ingestion metrics
SELECT
  DATE_TRUNC('minute', ingestion_time) as minute,
  COUNT(*) as rows_ingested,
  COUNT(DISTINCT event_type) as event_types,
  AVG(processing_time_ms) as avg_latency_ms
FROM streaming_events
WHERE ingestion_time >= DATEADD(hour, -1, CURRENT_TIMESTAMP())
GROUP BY 1
ORDER BY 1 DESC;
  • Snowpipe Streaming enables sub-second latency ingestion
  • Kafka connector provides production-grade streaming integration
  • Channel monitoring is critical for streaming health
  • Exactly-once semantics require idempotent patterns
  • Use hybrid approaches for balanced latency and throughput

Advertisement

Need Expert Snowflake Help?

Get personalized warehouse optimization, data modeling, or Snowflake platform consulting.

Advertisement