πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Topic: Snowpipe & Data Ingestion

Snowflake AdvancedSnowpipe⭐ Premium

Advertisement

Snowflake Advanced Β· Interview Prep

Snowpipe & Data Ingestion

Difficulty: Medium-Hard Β· Commonly asked at Meta, Apple, Netflix

Interview Question

"Design a Snowpipe-based ingestion pipeline that handles 10GB/hour of JSON event data from S3, with requirements for schema evolution handling, deduplication, and error recovery. How do you monitor and optimize the pipeline?"

ℹ️

Companies Asking This: Meta (Data Engineer), Apple (Senior Data Engineer), Netflix (Staff Data Engineer), Amazon (L6 Data Engineer)


Snowpipe Fundamentals

Snowpipe is Snowflake's continuous data ingestion service that loads data from cloud storage into Snowflake tables. It uses a micro-batch loading approach (typically every 1-2 minutes).

Architecture

S3 / Azure / GCSSnowpipe(Service)Staging AreaTarget TableEventNotificationAuto-Ingest(Micro-batch)COPY INTO(1-2 min)DataAvailable

Basic Snowpipe Setup

-- 1. Create target table
CREATE TABLE raw_events (
    event_id VARCHAR(100),
    event_type VARCHAR(50),
    event_timestamp TIMESTAMP_NTZ,
    user_id VARCHAR(100),
    event_data VARIANT,
    _ingestion_time TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP(),
    _source_file VARCHAR(500)
);

-- 2. Create external stage
CREATE OR REPLACE STAGE s3_events_stage
    URL = 's3://my-bucket/events/'
    STORAGE_INTEGRATION = s3_integration
    FILE_FORMAT = (TYPE = 'JSON');

-- 3. Create Snowpipe with auto-ingest
CREATE OR REPLACE PIPE s3_events_pipe
    AUTO_INGEST = TRUE
    INTEGRATION = 's3_events_notification'
    AS
    COPY INTO raw_events
    FROM @s3_events_stage
    FILE_FORMAT = (
        TYPE = 'JSON'
        STRIP_OUTER_ARRAY = TRUE
        TRIM_SPACE = TRUE
        ERROR_ON_COLUMN_COUNT_MISMATCH = FALSE
    )
    ON_ERROR = 'CONTINUE'
    MATCH_BY_COLUMN_NAME = 'CASE_INSENSITIVE';

-- 4. Check pipe status
SELECT 
    pipe_name,
    pipe_owner,
    is_auto_ingest,
    notification_channel_name,
    created_on,
    last_altered_on
FROM information_schema.pipes
WHERE pipe_name = 'S3_EVENTS_PIPE';

-- 5. Monitor pipe ingestion
SELECT 
    pipe_name,
    file_name,
    file_size,
    table_name,
    rows_loaded,
    rows_parsed,
    error_count,
    load_time,
    load_start_time,
    load_end_time,
    system$pipe_status('S3_EVENTS_PIPE')
FROM snowflake.account_usage.copy_history
WHERE load_start_time >= DATEADD(hour, -1, CURRENT_TIMESTAMP())
ORDER BY load_start_time DESC;

COPY INTO Options Deep Dive

-- Comprehensive COPY INTO configuration
COPY INTO raw_events
FROM @s3_events_stage
FILE_FORMAT = (
    TYPE = 'JSON'
    STRIP_OUTER_ARRAY = TRUE
    TRIM_SPACE = TRUE
    ERROR_ON_COLUMN_COUNT_MISMATCH = FALSE
    NULL_IF = ('NULL', 'null', '', 'N/A')
    DATE_FORMAT = 'AUTO'
    TIMESTAMP_FORMAT = 'AUTO'
    BINARY_FORMAT = 'HEX'
    ENCODING = 'UTF8'
)
PATTERN = '.*events_[0-9]{8}_[0-9]{6}\\.json\\.gz$'
SKIP_HEADER = 0
TRUNCATECOLUMNS = FALSE
FORCE = FALSE
ON_ERROR = 'CONTINUE'  -- SKIP_FILE, ABORT_STATEMENT, CONTINUE
SIZE_LIMIT = 10737418240  -- 10GB max file size
PURGE = TRUE  -- Delete files after successful load
RETURN_FAILED_ONLY = FALSE
ENFORCE_LENGTH = FALSE
IGNORE_ORDER = FALSE
LOAD_UNORDERED_FILES = TRUE;

-- With column mapping
COPY INTO raw_events (event_id, event_type, event_timestamp, user_id, event_data)
FROM (
    SELECT 
        $1:event_id::VARCHAR,
        $1:event_type::VARCHAR,
        $1:timestamp::TIMESTAMP_NTZ,
        $1:user_id::VARCHAR,
        $1
    FROM @s3_events_stage
)
FILE_FORMAT = (TYPE = 'JSON' STRIP_OUTER_ARRAY = TRUE)
ON_ERROR = 'CONTINUE';

-- With pattern matching for specific files
COPY INTO raw_events
FROM @s3_events_stage
FILE_FORMAT = (TYPE = 'JSON' STRIP_OUTER_ARRAY = TRUE)
PATTERN = '.*2024-01-15.*\\.json\\.gz$'
ON_ERROR = 'CONTINUE';

Real-World Scenario: Meta

Question: "How do you handle late-arriving data and duplicate events in a Snowpipe-based pipeline?"

Solution: Idempotent Ingestion Pattern

-- 1. Create table with deduplication support
CREATE TABLE raw_events_dedup (
    event_id VARCHAR(100),
    event_type VARCHAR(50),
    event_timestamp TIMESTAMP_NTZ,
    user_id VARCHAR(100),
    event_data VARIANT,
    _ingestion_time TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP(),
    _source_file VARCHAR(500),
    _file_row_number NUMBER,
    PRIMARY KEY (event_id)
);

-- 2. Create Snowpipe with unique file tracking
CREATE OR REPLACE PIPE s3_events_pipe_dedup
    AUTO_INGEST = TRUE
    AS
    COPY INTO raw_events_dedup (event_id, event_type, event_timestamp, user_id, event_data, _source_file, _file_row_number)
    FROM (
        SELECT 
            $1:event_id::VARCHAR,
            $1:event_type::VARCHAR,
            $1:timestamp::TIMESTAMP_NTZ,
            $1:user_id::VARCHAR,
            $1,
            METADATA$FILENAME,
            METADATA$FILE_ROW_NUMBER
        FROM @s3_events_stage
    )
    FILE_FORMAT = (
        TYPE = 'JSON'
        STRIP_OUTER_ARRAY = TRUE
    )
    ON_ERROR = 'CONTINUE';

-- 3. Deduplication query for downstream consumers
CREATE OR REPLACE VIEW events_deduped AS
WITH ranked_events AS (
    SELECT 
        *,
        ROW_NUMBER() OVER (
            PARTITION BY event_id 
            ORDER BY _ingestion_time DESC
        ) AS rn
    FROM raw_events_dedup
)
SELECT 
    event_id,
    event_type,
    event_timestamp,
    user_id,
    event_data,
    _ingestion_time,
    _source_file
FROM ranked_events
WHERE rn = 1;

-- 4. Handle late-arriving data
-- Snowpipe automatically handles ordering by file name
-- But we need to handle late arrivals in downstream processing
CREATE OR REPLACE PROCEDURE process_events_batch()
RETURNS VARCHAR
LANGUAGE SQL
AS
$$
DECLARE
    batch_start TIMESTAMP_NTZ;
    processed_count NUMBER;
BEGIN
    batch_start := CURRENT_TIMESTAMP();
    
    -- Process only new events
    INSERT INTO events_processed
    SELECT 
        event_id,
        event_type,
        event_timestamp,
        user_id,
        PARSE_JSON(event_data):page_url::VARCHAR AS page_url,
        PARSE_JSON(event_data):session_id::VARCHAR AS session_id
    FROM events_deduped
    WHERE event_timestamp >= DATEADD(hour, -1, CURRENT_TIMESTAMP())
      AND event_id NOT IN (SELECT event_id FROM events_processed);
    
    processed_count := SQLROWCOUNT;
    
    RETURN 'Processed ' || processed_count || ' events at ' || 
           TO_CHAR(batch_start, 'YYYY-MM-DD HH24:MI:SS');
END;
$$;

Real-World Scenario: Apple

Question: "How do you handle schema evolution when the upstream event format changes? The source adds new fields without notification."

Schema Evolution Strategy

-- 1. Use VARIANT column for flexible schema
CREATE TABLE raw_events_flexible (
    event_id VARCHAR(100) NOT NULL,
    event_type VARCHAR(50),
    event_timestamp TIMESTAMP_NTZ,
    raw_data VARIANT,  -- Stores entire JSON payload
    _schema_version VARCHAR(20),
    _ingestion_time TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()
);

-- 2. Extract known fields, keep everything in VARIANT
COPY INTO raw_events_flexible (event_id, event_type, event_timestamp, raw_data, _schema_version)
FROM (
    SELECT 
        $1:event_id::VARCHAR,
        $1:event_type::VARCHAR,
        $1:timestamp::TIMESTAMP_NTZ,
        $1,  -- Store entire payload
        COALESCE($1:schema_version::VARCHAR, 'v1')
    FROM @s3_events_stage
)
FILE_FORMAT = (TYPE = 'JSON' STRIP_OUTER_ARRAY = TRUE)
ON_ERROR = 'CONTINUE';

-- 3. Create views for different schema versions
CREATE OR REPLACE VIEW events_v1 AS
SELECT 
    event_id,
    event_type,
    event_timestamp,
    raw_data:user_id::VARCHAR AS user_id,
    raw_data:page_url::VARCHAR AS page_url,
    NULL::VARCHAR AS device_type,  -- Not available in v1
    NULL::VARCHAR AS app_version    -- Not available in v1
FROM raw_events_flexible
WHERE _schema_version = 'v1' OR _schema_version IS NULL;

CREATE OR REPLACE VIEW events_v2 AS
SELECT 
    event_id,
    event_type,
    event_timestamp,
    raw_data:user_id::VARCHAR AS user_id,
    raw_data:page_url::VARCHAR AS page_url,
    raw_data:device_type::VARCHAR AS device_type,
    raw_data:app_version::VARCHAR AS app_version
FROM raw_events_flexible
WHERE _schema_version = 'v2';

-- 4. Unified view that handles all versions
CREATE OR REPLACE VIEW events_unified AS
SELECT 
    event_id,
    event_type,
    event_timestamp,
    raw_data:user_id::VARCHAR AS user_id,
    raw_data:page_url::VARCHAR AS page_url,
    raw_data:device_type::VARCHAR AS device_type,
    raw_data:app_version::VARCHAR AS app_version,
    raw_data:properties::VARIANT AS properties,  -- Additional fields
    _schema_version
FROM raw_events_flexible;

-- 5. Monitor schema evolution
SELECT 
    _schema_version,
    COUNT(*) AS event_count,
    MIN(_ingestion_time) AS first_seen,
    MAX(_ingestion_time) AS last_seen,
    COUNT(DISTINCT event_type) AS event_types
FROM raw_events_flexible
GROUP BY 1
ORDER BY 3 DESC;

ℹ️

Key Insight: Using VARIANT columns provides schema flexibility. Extract known fields for querying, but keep the full payload in VARIANT for when new fields appear. This is the "schema-on-read" pattern in action.


Error Handling & Recovery

-- 1. Monitor pipe errors
SELECT 
    pipe_name,
    file_name,
    error_count,
    error_message,
    load_start_time
FROM snowflake.account_usage.copy_history
WHERE error_count > 0
  AND load_start_time >= DATEADD(day, -7, CURRENT_TIMESTAMP())
ORDER BY load_start_time DESC;

-- 2. Query pipe status
SELECT SYSTEM$PIPE_STATUS('S3_EVENTS_PIPE');

-- 3. List files that failed to load
SELECT 
    file_name,
    file_size,
    error_message,
    load_start_time
FROM snowflake.account_usage.copy_history
WHERE pipe_name = 'S3_EVENTS_PIPE'
  AND error_count > 0
ORDER BY load_start_time DESC;

-- 4. Reload specific files
COPY INTO raw_events
FROM @s3_events_stage/path/to/failed/file.json.gz
FILE_FORMAT = (TYPE = 'JSON' STRIP_OUTER_ARRAY = TRUE)
ON_ERROR = 'CONTINUE';

-- 5. Create error log table
CREATE TABLE pipe_errors (
    error_id NUMBER AUTOINCREMENT,
    pipe_name VARCHAR(100),
    file_name VARCHAR(500),
    error_message VARCHAR(2000),
    error_timestamp TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP(),
    resolved BOOLEAN DEFAULT FALSE
);

-- 6. Populate error log
INSERT INTO pipe_errors (pipe_name, file_name, error_message)
SELECT 
    pipe_name,
    file_name,
    error_message
FROM snowflake.account_usage.copy_history
WHERE error_count > 0
  AND load_start_time >= DATEADD(hour, -1, CURRENT_TIMESTAMP());

Performance Optimization

-- 1. Monitor pipe ingestion performance
SELECT 
    pipe_name,
    AVG(rows_loaded) AS avg_rows_per_load,
    AVG(file_size) AS avg_file_size,
    COUNT(*) AS total_loads,
    SUM(rows_loaded) AS total_rows_loaded,
    AVG(TIMESTAMPDIFF(second, load_start_time, load_end_time)) AS avg_load_time_seconds
FROM snowflake.account_usage.copy_history
WHERE pipe_name = 'S3_EVENTS_PIPE'
  AND load_start_time >= DATEADD(day, -7, CURRENT_TIMESTAMP())
GROUP BY pipe_name;

-- 2. Optimize file sizes (target 100MB-1GB compressed)
SELECT 
    file_name,
    file_size / (1024*1024) AS file_size_mb,
    rows_loaded,
    CASE 
        WHEN file_size < 10485760 THEN 'TOO SMALL - overhead per file'
        WHEN file_size > 1073741824 THEN 'TOO LARGE - split files'
        ELSE 'OPTIMAL'
    END AS size_recommendation
FROM snowflake.account_usage.copy_history
WHERE pipe_name = 'S3_EVENTS_PIPE'
  AND load_start_time >= DATEADD(day, -1, CURRENT_TIMESTAMP())
ORDER BY file_size ASC;

-- 3. Create pipe with optimized settings
CREATE OR REPLACE PIPE optimized_pipe
    AUTO_INGEST = TRUE
    MAX_FILE_SIZE = 536870912  -- 512MB max file size
    AS
    COPY INTO raw_events
    FROM @s3_events_stage
    FILE_FORMAT = (
        TYPE = 'JSON'
        STRIP_OUTER_ARRAY = TRUE
    )
    ON_ERROR = 'CONTINUE';

Best Practices

PracticeRecommendation
File sizeTarget 100MB-1GB compressed for optimal throughput
File formatUse columnar formats (Parquet, ORC) for better compression
Error handlingUse ON_ERROR = 'CONTINUE' for production, 'SKIP_FILE' for critical
DeduplicationTrack source files or use event_id with ROW_NUMBER()
MonitoringSet up alerts for pipe lag > threshold
Schema evolutionUse VARIANT for flexibility, views for structured access

⚠️

Common Pitfalls:

  1. Too many small files β€” Causes high per-file overhead, slow ingestion
  2. No file naming convention β€” Makes debugging and reloading difficult
  3. Ignoring error logs β€” Silent failures can cause data gaps
  4. Not monitoring pipe lag β€” Snowpipe can fall behind without alerts

Advertisement