Snowflake ETL Pipeline Patterns
Effective ETL pipelines in Snowflake combine data ingestion, transformation, and loading with built-in reliability, scalability, and performance.
ETL Pipeline Components
1. Extraction Patterns
-- Create external stage for file ingestion
CREATE STAGE my_s3_stage
URL = 's3://my-bucket/data/'
STORAGE_INTEGRATION = s3_integration;
-- List files in stage
LIST @my_s3_stage;
-- Copy from external stage
COPY INTO raw_data
FROM @my_s3_stage
FILE_FORMAT = (TYPE = PARQUET)
MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE;
-- Create pipe for continuous ingestion
CREATE PIPE raw_data_pipe
AUTO_INGEST = TRUE
AS
COPY INTO raw_data
FROM @my_s3_stage
FILE_FORMAT = (TYPE = PARQUET);
2. Transformation Patterns
-- Create transformation procedure
CREATE OR REPLACE PROCEDURE transform_customer_data()
RETURNS STRING
LANGUAGE SQL
AS
$$
BEGIN
-- Truncate target table
TRUNCATE TABLE silver.customers;
-- Transform and load
INSERT INTO silver.customers (
customer_id,
customer_name,
email,
created_date,
updated_date
)
SELECT
RAW:customer_id::INTEGER,
INITCAP(RAW:first_name::STRING || ' ' || RAW:last_name::STRING),
LOWER(RAW:email::STRING),
RAW:created_date::DATE,
CURRENT_TIMESTAMP()
FROM bronze.raw_customers
WHERE RAW:customer_id IS NOT NULL;
RETURN 'SUCCESS: Customer data transformed';
END;
$$;
3. Loading Patterns
-- MERGE pattern for upserts
MERGE INTO silver.orders AS target
USING (
SELECT
order_id,
customer_id,
order_date,
amount,
status
FROM bronze.raw_orders
) AS source
ON target.order_id = source.order_id
WHEN MATCHED AND source.order_date > target.order_date THEN
UPDATE SET
customer_id = source.customer_id,
order_date = source.order_date,
amount = source.amount,
status = source.status,
updated_at = CURRENT_TIMESTAMP()
WHEN NOT MATCHED THEN
INSERT (order_id, customer_id, order_date, amount, status, created_at)
VALUES (source.order_id, source.customer_id, source.order_date, source.amount, source.status, CURRENT_TIMESTAMP());
-- INSERT OVERWRITE pattern
INSERT OVERWRITE INTO silver.daily_summary
SELECT
order_date,
COUNT(*) as order_count,
SUM(amount) as total_revenue
FROM bronze.raw_orders
GROUP BY order_date;
Pipeline Orchestration
Using Tasks
-- Create task hierarchy
CREATE OR REPLACE TASK extract_task
WAREHOUSE = 'COMPUTE_WH'
SCHEDULE = 'USING CRON 0 2 * * * America/New_York'
AS
CALL extract_source_data();
CREATE OR REPLACE TASK transform_task
WAREHOUSE = 'COMPUTE_WH'
AFTER extract_task
AS
CALL transform_customer_data();
CREATE OR REPLACE TASK load_task
WAREHOUSE = 'COMPUTE_WH'
AFTER transform_task
AS
CALL load_to_target();
-- Execute pipeline
EXECUTE TASK extract_task;
-- Monitor task execution
SELECT
task_name,
query_id,
state,
scheduled_time,
completed_time
FROM TABLE(INFORMATION_SCHEMA.TASK_HISTORY(
SCHEDULED_TIME_RANGE_START => DATEADD('hour', -24, CURRENT_TIMESTAMP())
))
ORDER BY scheduled_time DESC;
Using Streams and Tasks
-- Create stream for change data capture
CREATE STREAM customer_changes
ON TABLE bronze.raw_customers
APPEND_ONLY = FALSE
SHOW_INITIAL_ROWS = TRUE;
-- Create task to process changes
CREATE OR REPLACE TASK process_customer_changes
WAREHOUSE = 'COMPUTE_WH'
SCHEDULE = 'USING CRON * * * * * America/New_York'
AS
MERGE INTO silver.customers AS target
USING (
SELECT
RAW:customer_id::INTEGER as customer_id,
RAW:first_name::STRING as first_name,
RAW:last_name::STRING as last_name,
RAW:email::STRING as email,
METADATA$ACTION as action,
METADATA$ISUPDATE as is_update
FROM customer_changes
WHERE METADATA$ACTION = 'INSERT'
) AS source
ON target.customer_id = source.customer_id
WHEN MATCHED AND source.is_update THEN
UPDATE SET
first_name = source.first_name,
last_name = source.last_name,
email = source.email,
updated_at = CURRENT_TIMESTAMP()
WHEN NOT MATCHED THEN
INSERT (customer_id, first_name, last_name, email, created_at)
VALUES (source.customer_id, source.first_name, source.last_name, source.email, CURRENT_TIMESTAMP());
Error Handling
-- Create error handling procedure
CREATE OR REPLACE PROCEDURE execute_with_error_handling(
procedure_name STRING
)
RETURNS STRING
LANGUAGE SQL
AS
$$
DECLARE
result STRING;
error_message STRING;
BEGIN
BEGIN TRANSACTION;
EXECUTE IMMEDIATE 'CALL ' || procedure_name || '()';
COMMIT;
RETURN 'SUCCESS: ' || procedure_name;
EXCEPTION
WHEN OTHER THEN
ROLLBACK;
error_message := SQLERRM;
INSERT INTO pipeline_errors (
procedure_name,
error_message,
error_timestamp
) VALUES (
procedure_name,
error_message,
CURRENT_TIMESTAMP()
);
RETURN 'ERROR: ' || error_message;
END;
$$;
-- Monitor pipeline errors
SELECT
procedure_name,
error_message,
error_timestamp
FROM pipeline_errors
WHERE error_timestamp >= DATEADD('day', -1, CURRENT_TIMESTAMP())
ORDER BY error_timestamp DESC;
Always implement error handling and logging in ETL pipelines. Use transactions to ensure atomicity and rollback capabilities for data integrity.
Performance Optimization
-- Use staging tables for bulk operations
CREATE TEMPORARY TABLE temp_stage AS
SELECT * FROM source_table;
-- Bulk insert from staging
INSERT INTO target_table
SELECT * FROM temp_stage;
-- Use parallel processing
ALTER WAREHOUSE compute_wh SET
MAX_CLUSTER_COUNT = 4,
SCALING_POLICY = 'ECONOMY';
-- Monitor pipeline performance
SELECT
query_id,
query_text,
execution_time_ms,
bytes_scanned,
rows_produced
FROM TABLE(INFORMATION_SCHEMA.QUERY_HISTORY(
START_TIME => DATEADD('hour', -1, CURRENT_TIMESTAMP())
))
WHERE query_text LIKE '%INSERT%' OR query_text LIKE '%MERGE%'
ORDER BY execution_time_ms DESC;
Pipeline Monitoring
-- Create pipeline monitoring view
CREATE OR REPLACE VIEW pipeline_monitoring AS
SELECT
task_name,
state,
scheduled_time,
completed_time,
TIMESTAMPDIFF('second', scheduled_time, completed_time) as duration_seconds,
query_id,
error_message
FROM TABLE(INFORMATION_SCHEMA.TASK_HISTORY(
SCHEDULED_TIME_RANGE_START => DATEADD('day', -7, CURRENT_TIMESTAMP())
))
ORDER BY scheduled_time DESC;
-- Monitor data freshness
SELECT
table_name,
MAX(updated_at) as last_updated,
TIMESTAMPDIFF('hour', MAX(updated_at), CURRENT_TIMESTAMP()) as hours_since_update
FROM silver.customers
GROUP BY table_name;
ETL Best Practices
| Pattern | Implementation | Benefit |
|---|---|---|
| Incremental Processing | Streams + Tasks | Efficient updates |
| Idempotent Operations | MERGE statements | Safe retries |
| Error Handling | TRY/CATCH blocks | Reliable execution |
| Logging | Audit tables | Troubleshooting |
| Monitoring | Task history views | Pipeline visibility |
| Optimization | Warehouse scaling | Performance |
Key Takeaways:
- ETL pipelines combine extraction, transformation, and loading
- Snowflake Tasks enable scheduled and dependent execution
- Streams provide real-time change data capture
- Error handling ensures pipeline reliability
- Performance optimization through warehouse scaling
- Monitoring provides pipeline visibility and troubleshooting