CW

Snowflake Provider Integration with Airflow

Free Lesson

Advertisement

Snowflake Provider Integration with Airflow

Architecture Diagram

Formal Definitions

DfSnowflake Provider

The Snowflake provider (apache-airflow-providers-snowflake) provides operators, hooks, and triggers for Snowflake data warehouse integration. It supports SQL execution, COPY INTO operations, Snowpipe management, and data transfer between Snowflake and cloud storage.

DfCOPY INTO

The COPY INTO command loads data from external stages (S3, GCS, Azure) into Snowflake tables. It supports file format specification, pattern matching, error handling, and size-based split operations. Formally: COPYΒ INTOΒ T←Stage(P)\text{COPY INTO } T \leftarrow \text{Stage}(P) where PP is a file pattern.

DfSnowpipe

Snowpipe is Snowflake's continuous data ingestion service. It automatically loads small files into target tables using auto-ingest or API-triggered pipelines. Snowpipe runs on Snowflake's compute resources, reducing Airflow's operational overhead.

Detailed Explanation

Connection Setup

# Airflow connection for Snowflake
# Connection ID: snowflake_default
# Connection Type: Snowflake
# Host: <account>.snowflakecomputing.com
# Schema: PUBLIC
# Database: ANALYTICS
# Warehouse: COMPUTE_WH
# Account: <account_identifier>
# Login: <username>
# Password: <password> (or use private_key for key-pair auth)
# Extra: {
#   "region": "us-east-1",
#   "warehouse": "COMPUTE_WH",
#   "database": "ANALYTICS",
#   "role": "SYSADMIN"
# }

Basic SQL Execution

from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator

with DAG(
    dag_id='snowflake_etl',
    start_date=datetime(2024, 1, 1),
    schedule_interval='@daily',
    catchup=False,
    tags=['snowflake', 'etl'],
) as dag:

    create_schema = SnowflakeOperator(
        task_id='create_schema',
        snowflake_conn_id='snowflake_default',
        sql="""
            CREATE SCHEMA IF NOT EXISTS raw;
            CREATE SCHEMA IF NOT EXISTS staging;
            CREATE SCHEMA IF NOT EXISTS analytics;
        """,
    )

    create_table = SnowflakeOperator(
        task_id='create_orders_table',
        snowflake_conn_id='snowflake_default',
        sql="""
            CREATE TABLE IF NOT EXISTS staging.orders (
                order_id INTEGER,
                customer_id INTEGER,
                amount DECIMAL(10,2),
                order_date TIMESTAMP_NTZ,
                status VARCHAR(20)
            )
            CLUSTER BY (order_date);
        """,
    )

    load_data = SnowflakeOperator(
        task_id='load_orders',
        snowflake_conn_id='snowflake_default',
        sql="""
            COPY INTO staging.orders
            FROM @raw_stage/orders/
            FILE_FORMAT = (
                TYPE = 'PARQUET'
                COMPRESSION = 'SNAPPY'
            )
            PATTERN = '.*\\.parquet'
            ON_ERROR = 'SKIP_FILE';
        """,
    )

    transform = SnowflakeOperator(
        task_id='transform_orders',
        snowflake_conn_id='snowflake_default',
        sql="""
            MERGE INTO analytics.fct_orders AS target
            USING (
                SELECT
                    order_id,
                    customer_id,
                    SUM(amount) as total_amount,
                    COUNT(*) as order_count,
                    MAX(order_date) as last_order_date
                FROM staging.orders
                WHERE order_date >= DATEADD(day, -1, CURRENT_DATE())
                GROUP BY order_id, customer_id
            ) AS source
            ON target.order_id = source.order_id
            WHEN MATCHED THEN UPDATE SET
                total_amount = source.total_amount,
                order_count = source.order_count,
                last_order_date = source.last_order_date
            WHEN NOT MATCHED THEN INSERT
                (order_id, customer_id, total_amount, order_count, last_order_date)
                VALUES (source.order_id, source.customer_id, source.total_amount,
                        source.order_count, source.last_order_date);
        """,
    )

    create_schema >> create_table >> load_data >> transform

GCS to Snowflake Loading

from airflow.providers.snowflake.operators.snowflake import (
    SnowflakeOperator,
)

# Step 1: Create a named stage pointing to GCS
create_stage = SnowflakeOperator(
    task_id='create_gcs_stage',
    snowflake_conn_id='snowflake_default',
    sql="""
        CREATE OR REPLACE STAGE raw.raw_stage
        URL = 'gcs://data-lake/raw/orders/'
        STORAGE_INTEGRATION = gcs_integration;
    """,
)

# Step 2: List files in stage
list_files = SnowflakeOperator(
    task_id='list_stage_files',
    snowflake_conn_id='snowflake_default',
    sql="LIST @raw.raw_stage;",
)

# Step 3: Load with COPY INTO
load_from_stage = SnowflakeOperator(
    task_id='load_from_stage',
    snowflake_conn_id='snowflake_default',
    sql="""
        COPY INTO staging.orders
        FROM @raw.raw_stage
        FILE_FORMAT = (
            TYPE = 'CSV'
            FIELD_DELIMITER = '|'
            SKIP_HEADER = 1
            NULL_IF = ('NULL', 'null', '')
        )
        PATTERN = 'orders_.*\\.csv'
        ON_ERROR = 'CONTINUE';
    """,
)

create_stage >> list_files >> load_from_stage

Pre/Post Operator Pattern

from airflow.providers.snowflake.operators.snowflake import (
    SnowflakeOperator,
)

# Pre-task: Set session parameters
pre_task = SnowflakeOperator(
    task_id='set_session_params',
    snowflake_conn_id='snowflake_default',
    sql="""
        ALTER SESSION SET TIMEZONE = 'UTC';
        ALTER SESSION SET STATEMENT_TIMEOUT_IN_SECONDS = 3600;
        ALTER SESSION SET QUERY_TAG = 'airflow_etl_{{ ds }}';
    """,
)

# Main ETL task
etl_task = SnowflakeOperator(
    task_id='run_etl',
    snowflake_conn_id='snowflake_default',
    sql="CALL run_etl_procedure('{{ ds }}');",
)

# Post-task: Clean up
post_task = SnowflakeOperator(
    task_id='cleanup',
    snowflake_conn_id='snowflake_default',
    sql="""
        ALTER SESSION UNSET QUERY_TAG;
        TRUNCATE TABLE staging.temp_data;
    """,
)

pre_task >> etl_task >> post_task

Multi-Warehouse Pattern

from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator

warehouses = {
    'heavy_etl': 'COMPUTE_WH_HEAVY',
    'light_transform': 'COMPUTE_WH_LIGHT',
    'ad_hoc': 'COMPUTE_WH_ADHOC',
}

for task_name, warehouse in warehouses.items():
    SnowflakeOperator(
        task_id=f'task_{task_name}',
        snowflake_conn_id='snowflake_default',
        sql=f"""
            ALTER WAREHOUSE {warehouse} RESUME IF SUSPENDED;
            -- ETL logic here
        """,
        warehouse=warehouse,
    )

Key Concepts Table

OperatorPurposeKey Parameters
SnowflakeOperatorExecute SQLsql, snowflake_conn_id
SnowflakeToGCSOperatorExport to GCSsnowflake_conn_id, gcs_bucket, sql
GCSToSnowflakeOperatorLoad from GCSbucket, prefix, table, stage
SnowflakeCopyFromExternalStageCOPY INTOtable, stage, file_format
SnowflakePrePostOperatorPre/post hookssql, pre_query, post_query
SnowflakeSensorWait for conditionsql, poke_interval

COPY INTO Options

OptionDescriptionExample
FILE_FORMATFile type specificationTYPE = 'PARQUET'
PATTERNRegex for file matchingPATTERN = '.*\\.parquet'
ON_ERRORError handling strategySKIP_FILE, CONTINUE, ABORT_STATEMENT
FORCEReload existing filesFORCE = TRUE
SIZE_LIMITMax bytes per loadSIZE_LIMIT = 1073741824
PURGEDelete files after loadPURGE = TRUE
RETURN_FAILED_ONLYReturn only failed filesRETURN_FAILED_ONLY = TRUE

Best Practices

  1. Use named stages for reusable external connections β€” avoid embedded credentials.
  2. Set ON_ERROR appropriately: SKIP_FILE for bad files, CONTINUE for partial loads.
  3. Partition large COPY INTO operations with SIZE_LIMIT to avoid single large transactions.
  4. Use PURGE = TRUE to clean up files after successful load.
  5. Set QUERY_TAG for monitoring and cost attribution per Airflow DAG.
  6. Monitor Snowpipe status with SHOW PIPES and SYSTEM$PIPE_STATUS.
  7. Use key-pair authentication for production β€” avoid password-based connections.
  8. Leverage clustering keys for frequently joined/filtered columns.

Snowpipe auto-ingestion reduces Airflow's role to trigger and monitor. Use Airflow to create/manage pipes and monitors, then let Snowflake handle continuous loading. This reduces task runtime and worker resource usage.

Snowflake charges based on credits consumed per second. Use ALTER WAREHOUSE ... SUSPEND after heavy ETL to avoid idle costs. Airflow tasks can handle suspend/resume as pre/post operations.

Key Takeaways:

  • SnowflakeOperator is the primary operator for all Snowflake SQL operations
  • COPY INTO loads data from external stages (S3, GCS, Azure) into Snowflake tables
  • Snowpipe provides continuous ingestion β€” use Airflow to manage pipes, not poll data
  • Set QUERY_TAG per DAG for cost attribution and query monitoring
  • Use key-pair authentication for production security
  • Monitor COPY INTO results with SYSTEM$COPY_LOAD_ERRORS and query history

See Also

Advertisement

Need Expert Airflow Help?

Get personalized DAG design, scheduling optimization, or production Airflow consulting.

Advertisement