CW

Snowflake ETL Pipeline Patterns

Free Lesson

Advertisement

Snowflake ETL Pipeline Patterns

Effective ETL pipelines in Snowflake combine data ingestion, transformation, and loading with built-in reliability, scalability, and performance.

ETL Pipeline Components

1. Extraction Patterns

-- Create external stage for file ingestion
CREATE STAGE my_s3_stage
  URL = 's3://my-bucket/data/'
  STORAGE_INTEGRATION = s3_integration;

-- List files in stage
LIST @my_s3_stage;

-- Copy from external stage
COPY INTO raw_data
  FROM @my_s3_stage
  FILE_FORMAT = (TYPE = PARQUET)
  MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE;

-- Create pipe for continuous ingestion
CREATE PIPE raw_data_pipe
  AUTO_INGEST = TRUE
  AS
  COPY INTO raw_data
  FROM @my_s3_stage
  FILE_FORMAT = (TYPE = PARQUET);

2. Transformation Patterns

-- Create transformation procedure
CREATE OR REPLACE PROCEDURE transform_customer_data()
RETURNS STRING
LANGUAGE SQL
AS
$$
BEGIN
  -- Truncate target table
  TRUNCATE TABLE silver.customers;

  -- Transform and load
  INSERT INTO silver.customers (
    customer_id,
    customer_name,
    email,
    created_date,
    updated_date
  )
  SELECT
    RAW:customer_id::INTEGER,
    INITCAP(RAW:first_name::STRING || ' ' || RAW:last_name::STRING),
    LOWER(RAW:email::STRING),
    RAW:created_date::DATE,
    CURRENT_TIMESTAMP()
  FROM bronze.raw_customers
  WHERE RAW:customer_id IS NOT NULL;

  RETURN 'SUCCESS: Customer data transformed';
END;
$$;

3. Loading Patterns

-- MERGE pattern for upserts
MERGE INTO silver.orders AS target
USING (
  SELECT
    order_id,
    customer_id,
    order_date,
    amount,
    status
  FROM bronze.raw_orders
) AS source
ON target.order_id = source.order_id
WHEN MATCHED AND source.order_date > target.order_date THEN
  UPDATE SET
    customer_id = source.customer_id,
    order_date = source.order_date,
    amount = source.amount,
    status = source.status,
    updated_at = CURRENT_TIMESTAMP()
WHEN NOT MATCHED THEN
  INSERT (order_id, customer_id, order_date, amount, status, created_at)
  VALUES (source.order_id, source.customer_id, source.order_date, source.amount, source.status, CURRENT_TIMESTAMP());

-- INSERT OVERWRITE pattern
INSERT OVERWRITE INTO silver.daily_summary
SELECT
  order_date,
  COUNT(*) as order_count,
  SUM(amount) as total_revenue
FROM bronze.raw_orders
GROUP BY order_date;

Pipeline Orchestration

Using Tasks

-- Create task hierarchy
CREATE OR REPLACE TASK extract_task
  WAREHOUSE = 'COMPUTE_WH'
  SCHEDULE = 'USING CRON 0 2 * * * America/New_York'
AS
  CALL extract_source_data();

CREATE OR REPLACE TASK transform_task
  WAREHOUSE = 'COMPUTE_WH'
  AFTER extract_task
AS
  CALL transform_customer_data();

CREATE OR REPLACE TASK load_task
  WAREHOUSE = 'COMPUTE_WH'
  AFTER transform_task
AS
  CALL load_to_target();

-- Execute pipeline
EXECUTE TASK extract_task;

-- Monitor task execution
SELECT
  task_name,
  query_id,
  state,
  scheduled_time,
  completed_time
FROM TABLE(INFORMATION_SCHEMA.TASK_HISTORY(
  SCHEDULED_TIME_RANGE_START => DATEADD('hour', -24, CURRENT_TIMESTAMP())
))
ORDER BY scheduled_time DESC;

Using Streams and Tasks

-- Create stream for change data capture
CREATE STREAM customer_changes
  ON TABLE bronze.raw_customers
  APPEND_ONLY = FALSE
  SHOW_INITIAL_ROWS = TRUE;

-- Create task to process changes
CREATE OR REPLACE TASK process_customer_changes
  WAREHOUSE = 'COMPUTE_WH'
  SCHEDULE = 'USING CRON * * * * * America/New_York'
AS
  MERGE INTO silver.customers AS target
  USING (
    SELECT
      RAW:customer_id::INTEGER as customer_id,
      RAW:first_name::STRING as first_name,
      RAW:last_name::STRING as last_name,
      RAW:email::STRING as email,
      METADATA$ACTION as action,
      METADATA$ISUPDATE as is_update
    FROM customer_changes
    WHERE METADATA$ACTION = 'INSERT'
  ) AS source
  ON target.customer_id = source.customer_id
  WHEN MATCHED AND source.is_update THEN
    UPDATE SET
      first_name = source.first_name,
      last_name = source.last_name,
      email = source.email,
      updated_at = CURRENT_TIMESTAMP()
  WHEN NOT MATCHED THEN
    INSERT (customer_id, first_name, last_name, email, created_at)
    VALUES (source.customer_id, source.first_name, source.last_name, source.email, CURRENT_TIMESTAMP());

Error Handling

-- Create error handling procedure
CREATE OR REPLACE PROCEDURE execute_with_error_handling(
  procedure_name STRING
)
RETURNS STRING
LANGUAGE SQL
AS
$$
DECLARE
  result STRING;
  error_message STRING;
BEGIN
  BEGIN TRANSACTION;
    EXECUTE IMMEDIATE 'CALL ' || procedure_name || '()';
  COMMIT;

  RETURN 'SUCCESS: ' || procedure_name;
EXCEPTION
  WHEN OTHER THEN
    ROLLBACK;
    error_message := SQLERRM;
    INSERT INTO pipeline_errors (
      procedure_name,
      error_message,
      error_timestamp
    ) VALUES (
      procedure_name,
      error_message,
      CURRENT_TIMESTAMP()
    );
    RETURN 'ERROR: ' || error_message;
END;
$$;

-- Monitor pipeline errors
SELECT
  procedure_name,
  error_message,
  error_timestamp
FROM pipeline_errors
WHERE error_timestamp >= DATEADD('day', -1, CURRENT_TIMESTAMP())
ORDER BY error_timestamp DESC;

Always implement error handling and logging in ETL pipelines. Use transactions to ensure atomicity and rollback capabilities for data integrity.

Performance Optimization

-- Use staging tables for bulk operations
CREATE TEMPORARY TABLE temp_stage AS
SELECT * FROM source_table;

-- Bulk insert from staging
INSERT INTO target_table
SELECT * FROM temp_stage;

-- Use parallel processing
ALTER WAREHOUSE compute_wh SET
  MAX_CLUSTER_COUNT = 4,
  SCALING_POLICY = 'ECONOMY';

-- Monitor pipeline performance
SELECT
  query_id,
  query_text,
  execution_time_ms,
  bytes_scanned,
  rows_produced
FROM TABLE(INFORMATION_SCHEMA.QUERY_HISTORY(
  START_TIME => DATEADD('hour', -1, CURRENT_TIMESTAMP())
))
WHERE query_text LIKE '%INSERT%' OR query_text LIKE '%MERGE%'
ORDER BY execution_time_ms DESC;

Pipeline Monitoring

-- Create pipeline monitoring view
CREATE OR REPLACE VIEW pipeline_monitoring AS
SELECT
  task_name,
  state,
  scheduled_time,
  completed_time,
  TIMESTAMPDIFF('second', scheduled_time, completed_time) as duration_seconds,
  query_id,
  error_message
FROM TABLE(INFORMATION_SCHEMA.TASK_HISTORY(
  SCHEDULED_TIME_RANGE_START => DATEADD('day', -7, CURRENT_TIMESTAMP())
))
ORDER BY scheduled_time DESC;

-- Monitor data freshness
SELECT
  table_name,
  MAX(updated_at) as last_updated,
  TIMESTAMPDIFF('hour', MAX(updated_at), CURRENT_TIMESTAMP()) as hours_since_update
FROM silver.customers
GROUP BY table_name;

ETL Best Practices

PatternImplementationBenefit
Incremental ProcessingStreams + TasksEfficient updates
Idempotent OperationsMERGE statementsSafe retries
Error HandlingTRY/CATCH blocksReliable execution
LoggingAudit tablesTroubleshooting
MonitoringTask history viewsPipeline visibility
OptimizationWarehouse scalingPerformance

Key Takeaways:

  • ETL pipelines combine extraction, transformation, and loading
  • Snowflake Tasks enable scheduled and dependent execution
  • Streams provide real-time change data capture
  • Error handling ensures pipeline reliability
  • Performance optimization through warehouse scaling
  • Monitoring provides pipeline visibility and troubleshooting

Advertisement

Need Expert Snowflake Help?

Get personalized warehouse optimization, data modeling, or Snowflake platform consulting.

Advertisement