CW

Snowflake Change Data Capture (CDC)

Free Lesson

Advertisement

Snowflake Change Data Capture (CDC)

Change Data Capture (CDC) in Snowflake uses Streams to track changes and Tasks to apply them, enabling efficient data synchronization between source and target systems.

Architecture Overview

<svg width="800" height="450" viewBox="0 0 800 450" xmlns="http://www.w3.org/2000/svg">
  <defs>
    <linearGradient id="srcGrad" x1="0%" y1="0%" x2="100%" y2="0%">
      <stop offset="0%" style="stop-color:#3498DB;stop-opacity:1" />
      <stop offset="100%" style="stop-color:#5DADE2;stop-opacity:1" />
    </linearGradient>
    <linearGradient id="tgtGrad" x1="0%" y1="0%" x2="100%" y2="0%">
      <stop offset="0%" style="stop-color:#2ECC71;stop-opacity:1" />
      <stop offset="100%" style="stop-color:#58D68D;stop-opacity:1" />
    </linearGradient>
  </defs>

  <text x="400" y="30" text-anchor="middle" font-size="18" font-weight="bold" fill="#333">Snowflake CDC Architecture</text>
  <rect x="30" y="60" width="160" height="150" rx="10" fill="url(#srcGrad)" opacity="0.9"/>
  <text x="110" y="85" text-anchor="middle" font-size="14" fill="white" font-weight="bold">Source Table</text>
  <text x="110" y="110" text-anchor="middle" font-size="11" fill="white">INSERT</text>
  <text x="110" y="125" text-anchor="middle" font-size="11" fill="white">UPDATE</text>
  <text x="110" y="140" text-anchor="middle" font-size="11" fill="white">DELETE</text>
  <text x="110" y="165" text-anchor="middle" font-size="11" fill="white">MERGE</text>
  <text x="110" y="185" text-anchor="middle" font-size="10" fill="white" font-style="italic">Traditional DML</text>
  <path d="M190 135 L230 135" stroke="#333" stroke-width="2" fill="none" marker-end="url(#arrowCDC)"/>
  <rect x="230" y="60" width="160" height="150" rx="10" fill="#F39C12" opacity="0.9"/>
  <text x="310" y="85" text-anchor="middle" font-size="14" fill="white" font-weight="bold">Stream</text>
  <text x="310" y="110" text-anchor="middle" font-size="11" fill="white">Metadata Tracking</text>
  <text x="310" y="125" text-anchor="middle" font-size="11" fill="white">Change Types</text>
  <text x="310" y:="140" text-anchor="middle" font-size="11" fill="white">Transaction Time</text>
  <text x="310" y="155" text-anchor="middle" font-size="11" fill="white">Log Offset</text>
  <text x="310" y="185" text-anchor="middle" font-size="10" fill="white" font-style="italic">Delta Table</text>
  <path d="M390 135 L430 135" stroke="#333" stroke-width="2" fill="none" marker-end="url(#arrowCDC)"/>
  <rect x="430" y="60" width="160" height="150" rx="10" fill="#9B59B6" opacity="0.9"/>
  <text x="510" y="85" text-anchor="middle" font-size="14" fill="white" font-weight="bold">Task</text>
  <text x="510" y="110" text-anchor="middle" font-size="11" fill="white">Schedule Execution</text>
  <text x="510" y="125" text-anchor="middle" font-size="11" fill="white">MERGE Operation</text>
  <text x="510" y="140" text-anchor="middle" font-size="11" fill="white">Error Handling</text>
  <text x="510" y="155" text-anchor="middle" font-size="11" fill="white">Retry Logic</text>
  <text x="510" y="185" text-anchor="middle" font-size="10" fill="white" font-style="italic">Automated Pipeline</text>
  <path d="M590 135 L630 135" stroke="#333" stroke-width="2" fill="none" marker-end="url(#arrowCDC)"/>
  <rect x="630" y="60" width="140" height="150" rx="10" fill="url(#tgtGrad)" opacity="0.9"/>
  <text x="700" y="85" text-anchor="middle" font-size="14" fill="white" font-weight="bold">Target Table</text>
  <text x="700" y="110" text-anchor="middle" font-size="11" fill="white">Current State</text>
  <text x="700" y="125" text-anchor="middle" font-size="11" fill="white">Historical</text>
  <text x="700" y="140" text-anchor="middle" font-size="11" fill="white">Aggregated</text>
  <text x="700" y="155" text-anchor="middle" font-size="11" fill="white">Derived</text>
  <text x="700" y="185" text-anchor="middle" font-size="10" fill="white" font-style="italic">Analytics Ready</text>
  <rect x="30" y="240" width="740" height="180" rx="10" fill="#ECF0F1" opacity="0.9"/>
  <text x="400" y="265" text-anchor="middle" font-size="14" fill="#333" font-weight="bold">Change Types and Metadata</text>

  <rect x="60" y="280" width="170" height="120" rx="8" fill="white"/>
  <text x="145" y="300" text-anchor="middle" font-size="12" fill="#333" font-weight="bold">INSERT</text>
  <text x="145" y="320" text-anchor="middle" font-size="10" fill="#666">METADATA$ACTION = 'INSERT'</text>
  <text x="145" y="340" text-anchor="middle" font-size="10" fill="#666">METADATA$ISUPDATE = FALSE</text>
  <text x="145" y="360" text-anchor="middle" font-size="10" fill="#666">METADATA$ROW_ID = NEW</text>
  <text x="145" y="380" text-anchor="middle" font-size="10" fill="#666">New record added</text>

  <rect x="260" y="280" width="170" height="120" rx="8" fill="white"/>
  <text x="345" y="300" text-anchor="middle" font-size="12" fill="#333" font-weight="bold">UPDATE</text>
  <text x="345" y="320" text-anchor="middle" font-size="10" fill="#666">METADATA$ACTION = 'INSERT'</text>
  <text x="345" y="340" text-anchor="middle" font-size="10" fill="#666">METADATA$ISUPDATE = TRUE</text>
  <text x="345" y="360" text-anchor="middle" font-size="10" fill="#666">METADATA$ROW_ID = CHANGED</text>
  <text x="345" y="380" text-anchor="middle" font-size="10" fill="#666">Record modified</text>

  <rect x="460" y="280" width="170" height="120" rx="8" fill="white"/>
  <text x="545" y="300" text-anchor="middle" font-size="12" fill="#333" font-weight="bold">DELETE</text>
  <text x="545" y="320" text-anchor="middle" font-size="10" fill="#666">METADATA$ACTION = 'DELETE'</text>
  <text x="545" y="340" text-anchor="middle" font-size="10" fill="#666">METADATA$ISUPDATE = FALSE</text>
  <text x="545" y="360" text-anchor="middle" font-size="10" fill="#666">METADATA$ROW_ID = DELETED</text>
  <text x="545" y="380" text-anchor="middle" font-size="10" fill="#666">Record removed</text>

  <defs>
    <marker id="arrowCDC" markerWidth="10" markerHeight="10" refX="9" refY="3" orient="auto" markerUnits="strokeWidth">
      <path d="M0,0 L0,6 L9,3 z" fill="#333"/>
    </marker>
  </defs>
</svg>

Key Concepts

DfStream

DfCDC Latency

Creating CDC Pipelines

Basic Stream Creation

-- Standard stream (requires full table)
CREATE OR REPLACE STREAM cdc_stream
  ON TABLE source_table
  SHOW_INITIAL_ROWS = TRUE
  APPEND_ONLY = FALSE
  COMMENT = 'CDC stream for source_table';

-- Append-only stream (more efficient)
CREATE OR REPLACE STREAM append_only_stream
  ON TABLE source_table
  APPEND_ONLY = TRUE
  SHOW_INITIAL_ROWS = FALSE;

-- External stream (for external tables)
CREATE OR REPLACE STREAM external_stream
  ON EXTERNAL TABLE external_table;

CDC Task with MERGE

-- Create target table
CREATE TABLE target_table AS
SELECT * FROM source_table WHERE 1=0;

-- Create CDC task
CREATE OR REPLACE TASK cdc_task
  WAREHOUSE = compute_wh
  SCHEDULE = '1 MINUTE'
  WHEN SYSTEM$STREAM_HAS_DATA('cdc_stream')
AS
  MERGE INTO target_table t
  USING (
    SELECT *
    FROM cdc_stream
    WHERE METADATA$ACTION = 'INSERT'
      AND METADATA$ISUPDATE = FALSE
  ) s
  ON t.id = s.id
  WHEN MATCHED AND s.METADATA$ACTION = 'DELETE' THEN
    DELETE
  WHEN NOT MATCHED THEN
    INSERT (id, name, value, updated_at)
    VALUES (s.id, s.name, s.value, s.updated_at);

ALTER TASK cdc_task RESUME;

Advanced CDC Patterns

Scd Type 2 Pattern

-- Create SCD Type 2 target
CREATE TABLE customer_scd (
  customer_id INT,
  name VARCHAR(100),
  email VARCHAR(200),
  valid_from TIMESTAMP_NTZ,
  valid_to TIMESTAMP_NTZ,
  is_current BOOLEAN DEFAULT TRUE
);

-- SCD2 merge pattern
MERGE INTO customer_scd t
USING (
  SELECT
    id,
    name,
    email,
    CURRENT_TIMESTAMP() as valid_from,
    '9999-12-31'::TIMESTAMP_NTZ as valid_to,
    TRUE as is_current
  FROM cdc_stream
  WHERE METADATA$ACTION = 'INSERT'
) s
ON t.customer_id = s.customer_id AND t.is_current = TRUE
WHEN MATCHED AND (t.name != s.name OR t.email != s.email) THEN
  UPDATE SET
    t.valid_to = s.valid_from,
    t.is_current = FALSE
WHEN NOT MATCHED THEN
  INSERT (customer_id, name, email, valid_from, valid_to, is_current)
  VALUES (s.customer_id, s.name, s.email, s.valid_from, s.valid_to, s.is_current);

Multi-Table CDC

-- Stream on multiple tables
CREATE STREAM orders_stream ON TABLE orders;
CREATE STREAM customers_stream ON TABLE customers;
CREATE STREAM products_stream ON TABLE products;

-- Single task for multiple streams
CREATE OR REPLACE TASK multi_cdc_task
  WAREHOUSE = compute_wh
  SCHEDULE = '5 MINUTE'
AS
BEGIN
  -- Process orders
  MERGE INTO orders_target t
  USING orders_stream s
  ON t.order_id = s.order_id
  WHEN MATCHED AND s.METADATA$ACTION = 'DELETE' THEN DELETE
  WHEN NOT MATCHED THEN INSERT *;

  -- Process customers
  MERGE INTO customers_target t
  USING customers_stream s
  ON t.customer_id = s.customer_id
  WHEN MATCHED AND s.METADATA$ACTION = 'DELETE' THEN DELETE
  WHEN NOT MATCHED THEN INSERT *;

  -- Process products
  MERGE INTO products_target t
  USING products_stream s
  ON t.product_id = s.product_id
  WHEN MATCHED AND s.METADATA$ACTION = 'DELETE' THEN DELETE
  WHEN NOT MATCHED THEN INSERT *;
END;

Stream Monitoring

Monitor stream lag by comparing the latest transaction timestamp with current time. Set up alerts when lag exceeds your defined thresholds. Streams retain data for 14 days by default.

-- Check stream metadata
SELECT *
FROM TABLE(INFORMATION_SCHEMA.STREAM_TABLES())
WHERE TABLE_NAME = 'CDC_STREAM';

-- Monitor stream data
SELECT
  COUNT(*) as pending_changes,
  MIN(METADATA$SNAPSHOT_ID) as earliest_snapshot,
  MAX(METADATA$SNAPSHOT_ID) as latest_snapshot
FROM cdc_stream;

-- Stream consumption progress
SELECT
  METADATA$SNAPSHOT_ID,
  COUNT(*) as change_count,
  MIN(METADATA$TIMESTAMP) as min_timestamp,
  MAX(METADATA$TIMESTAMP) as max_timestamp
FROM cdc_stream
GROUP BY 1
ORDER BY 1;

CDC vs Full Refresh Comparison

MetricCDCFull RefreshDelta
Data VolumeOnly changesComplete table95%+ reduction
LatencySecondsHoursNear real-time
CostLowHigh90%+ savings
ComplexityMediumLowSlight increase
ConsistencyEventualPoint-in-timeTrade-off
  • Streams track DML changes with metadata for CDC implementation
  • Tasks automate MERGE operations for target synchronization
  • SCD Type 2 patterns maintain historical data versions
  • Monitor stream lag and retention to prevent data loss
  • CDC significantly reduces compute and storage costs vs full refresh

Advertisement

Need Expert Snowflake Help?

Get personalized warehouse optimization, data modeling, or Snowflake platform consulting.

Advertisement