Snowflake Provider Integration with Airflow
Architecture Diagram
Formal Definitions
DfSnowflake Provider
The Snowflake provider (apache-airflow-providers-snowflake) provides operators, hooks, and triggers for Snowflake data warehouse integration. It supports SQL execution, COPY INTO operations, Snowpipe management, and data transfer between Snowflake and cloud storage.
DfCOPY INTO
The COPY INTO command loads data from external stages (S3, GCS, Azure) into Snowflake tables. It supports file format specification, pattern matching, error handling, and size-based split operations. Formally: where is a file pattern.
DfSnowpipe
Snowpipe is Snowflake's continuous data ingestion service. It automatically loads small files into target tables using auto-ingest or API-triggered pipelines. Snowpipe runs on Snowflake's compute resources, reducing Airflow's operational overhead.
Detailed Explanation
Connection Setup
# Airflow connection for Snowflake
# Connection ID: snowflake_default
# Connection Type: Snowflake
# Host: <account>.snowflakecomputing.com
# Schema: PUBLIC
# Database: ANALYTICS
# Warehouse: COMPUTE_WH
# Account: <account_identifier>
# Login: <username>
# Password: <password> (or use private_key for key-pair auth)
# Extra: {
# "region": "us-east-1",
# "warehouse": "COMPUTE_WH",
# "database": "ANALYTICS",
# "role": "SYSADMIN"
# }
Basic SQL Execution
from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
with DAG(
dag_id='snowflake_etl',
start_date=datetime(2024, 1, 1),
schedule_interval='@daily',
catchup=False,
tags=['snowflake', 'etl'],
) as dag:
create_schema = SnowflakeOperator(
task_id='create_schema',
snowflake_conn_id='snowflake_default',
sql="""
CREATE SCHEMA IF NOT EXISTS raw;
CREATE SCHEMA IF NOT EXISTS staging;
CREATE SCHEMA IF NOT EXISTS analytics;
""",
)
create_table = SnowflakeOperator(
task_id='create_orders_table',
snowflake_conn_id='snowflake_default',
sql="""
CREATE TABLE IF NOT EXISTS staging.orders (
order_id INTEGER,
customer_id INTEGER,
amount DECIMAL(10,2),
order_date TIMESTAMP_NTZ,
status VARCHAR(20)
)
CLUSTER BY (order_date);
""",
)
load_data = SnowflakeOperator(
task_id='load_orders',
snowflake_conn_id='snowflake_default',
sql="""
COPY INTO staging.orders
FROM @raw_stage/orders/
FILE_FORMAT = (
TYPE = 'PARQUET'
COMPRESSION = 'SNAPPY'
)
PATTERN = '.*\\.parquet'
ON_ERROR = 'SKIP_FILE';
""",
)
transform = SnowflakeOperator(
task_id='transform_orders',
snowflake_conn_id='snowflake_default',
sql="""
MERGE INTO analytics.fct_orders AS target
USING (
SELECT
order_id,
customer_id,
SUM(amount) as total_amount,
COUNT(*) as order_count,
MAX(order_date) as last_order_date
FROM staging.orders
WHERE order_date >= DATEADD(day, -1, CURRENT_DATE())
GROUP BY order_id, customer_id
) AS source
ON target.order_id = source.order_id
WHEN MATCHED THEN UPDATE SET
total_amount = source.total_amount,
order_count = source.order_count,
last_order_date = source.last_order_date
WHEN NOT MATCHED THEN INSERT
(order_id, customer_id, total_amount, order_count, last_order_date)
VALUES (source.order_id, source.customer_id, source.total_amount,
source.order_count, source.last_order_date);
""",
)
create_schema >> create_table >> load_data >> transform
GCS to Snowflake Loading
from airflow.providers.snowflake.operators.snowflake import (
SnowflakeOperator,
)
# Step 1: Create a named stage pointing to GCS
create_stage = SnowflakeOperator(
task_id='create_gcs_stage',
snowflake_conn_id='snowflake_default',
sql="""
CREATE OR REPLACE STAGE raw.raw_stage
URL = 'gcs://data-lake/raw/orders/'
STORAGE_INTEGRATION = gcs_integration;
""",
)
# Step 2: List files in stage
list_files = SnowflakeOperator(
task_id='list_stage_files',
snowflake_conn_id='snowflake_default',
sql="LIST @raw.raw_stage;",
)
# Step 3: Load with COPY INTO
load_from_stage = SnowflakeOperator(
task_id='load_from_stage',
snowflake_conn_id='snowflake_default',
sql="""
COPY INTO staging.orders
FROM @raw.raw_stage
FILE_FORMAT = (
TYPE = 'CSV'
FIELD_DELIMITER = '|'
SKIP_HEADER = 1
NULL_IF = ('NULL', 'null', '')
)
PATTERN = 'orders_.*\\.csv'
ON_ERROR = 'CONTINUE';
""",
)
create_stage >> list_files >> load_from_stage
Pre/Post Operator Pattern
from airflow.providers.snowflake.operators.snowflake import (
SnowflakeOperator,
)
# Pre-task: Set session parameters
pre_task = SnowflakeOperator(
task_id='set_session_params',
snowflake_conn_id='snowflake_default',
sql="""
ALTER SESSION SET TIMEZONE = 'UTC';
ALTER SESSION SET STATEMENT_TIMEOUT_IN_SECONDS = 3600;
ALTER SESSION SET QUERY_TAG = 'airflow_etl_{{ ds }}';
""",
)
# Main ETL task
etl_task = SnowflakeOperator(
task_id='run_etl',
snowflake_conn_id='snowflake_default',
sql="CALL run_etl_procedure('{{ ds }}');",
)
# Post-task: Clean up
post_task = SnowflakeOperator(
task_id='cleanup',
snowflake_conn_id='snowflake_default',
sql="""
ALTER SESSION UNSET QUERY_TAG;
TRUNCATE TABLE staging.temp_data;
""",
)
pre_task >> etl_task >> post_task
Multi-Warehouse Pattern
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
warehouses = {
'heavy_etl': 'COMPUTE_WH_HEAVY',
'light_transform': 'COMPUTE_WH_LIGHT',
'ad_hoc': 'COMPUTE_WH_ADHOC',
}
for task_name, warehouse in warehouses.items():
SnowflakeOperator(
task_id=f'task_{task_name}',
snowflake_conn_id='snowflake_default',
sql=f"""
ALTER WAREHOUSE {warehouse} RESUME IF SUSPENDED;
-- ETL logic here
""",
warehouse=warehouse,
)
Key Concepts Table
| Operator | Purpose | Key Parameters |
|---|---|---|
| SnowflakeOperator | Execute SQL | sql, snowflake_conn_id |
| SnowflakeToGCSOperator | Export to GCS | snowflake_conn_id, gcs_bucket, sql |
| GCSToSnowflakeOperator | Load from GCS | bucket, prefix, table, stage |
| SnowflakeCopyFromExternalStage | COPY INTO | table, stage, file_format |
| SnowflakePrePostOperator | Pre/post hooks | sql, pre_query, post_query |
| SnowflakeSensor | Wait for condition | sql, poke_interval |
COPY INTO Options
| Option | Description | Example |
|---|---|---|
| FILE_FORMAT | File type specification | TYPE = 'PARQUET' |
| PATTERN | Regex for file matching | PATTERN = '.*\\.parquet' |
| ON_ERROR | Error handling strategy | SKIP_FILE, CONTINUE, ABORT_STATEMENT |
| FORCE | Reload existing files | FORCE = TRUE |
| SIZE_LIMIT | Max bytes per load | SIZE_LIMIT = 1073741824 |
| PURGE | Delete files after load | PURGE = TRUE |
| RETURN_FAILED_ONLY | Return only failed files | RETURN_FAILED_ONLY = TRUE |
Best Practices
- Use named stages for reusable external connections β avoid embedded credentials.
- Set
ON_ERRORappropriately:SKIP_FILEfor bad files,CONTINUEfor partial loads. - Partition large COPY INTO operations with
SIZE_LIMITto avoid single large transactions. - Use
PURGE = TRUEto clean up files after successful load. - Set
QUERY_TAGfor monitoring and cost attribution per Airflow DAG. - Monitor Snowpipe status with
SHOW PIPESandSYSTEM$PIPE_STATUS. - Use key-pair authentication for production β avoid password-based connections.
- Leverage clustering keys for frequently joined/filtered columns.
Snowpipe auto-ingestion reduces Airflow's role to trigger and monitor. Use Airflow to create/manage pipes and monitors, then let Snowflake handle continuous loading. This reduces task runtime and worker resource usage.
Snowflake charges based on credits consumed per second. Use ALTER WAREHOUSE ... SUSPEND after heavy ETL to avoid idle costs. Airflow tasks can handle suspend/resume as pre/post operations.
Key Takeaways:
- SnowflakeOperator is the primary operator for all Snowflake SQL operations
- COPY INTO loads data from external stages (S3, GCS, Azure) into Snowflake tables
- Snowpipe provides continuous ingestion β use Airflow to manage pipes, not poll data
- Set QUERY_TAG per DAG for cost attribution and query monitoring
- Use key-pair authentication for production security
- Monitor COPY INTO results with SYSTEM$COPY_LOAD_ERRORS and query history
See Also
- BigQuery Provider β Google BigQuery integration patterns
- Databricks Provider β Databricks cluster and job management
- Operators and Hooks β Operator lifecycle and hook architecture
- XCom Communications β Task communication and data passing