Cross-Platform Integration: Spark, Airflow & dbt
Architecture Diagram 1: Spark Integration Architecture
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β SPARK-SNOWFLAKE INTEGRATION ARCHITECTURE β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β SPARK APPLICATION β
β ββββββββββββββββββ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Spark Session Configuration: β β β
β β β β β β
β β β from pyspark.sql import SparkSession β β β
β β β from snowflake.connector import SnowflakeConnection β β β
β β β β β β
β β β spark = SparkSession.builder \ β β β
β β β .appName("SnowflakeIntegration") \ β β β
β β β .config("spark.jars.packages", β β β
β β β "net.snowflake:snowflake-jdbc:3.15.0") \ β β β
β β β .getOrCreate() β β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Data Processing: β β β
β β β β β β
β β β # Read from Snowflake β β β
β β β df = spark.read \ β β β
β β β .format("snowflake") \ β β β
β β β .option("url", "snowflake://account/db/schema") \ β β β
β β β .option("user", "username") \ β β β
β β β .option("password", "password") \ β β β
β β β .option("dbtable", "source_table") \ β β β
β β β .load() β β β
β β β β β β
β β β # Transform data β β β
β β β transformed_df = df.filter(col("amount") > 1000) \ β β β
β β β .groupBy("region") \ β β β
β β β .agg(sum("amount").alias("total_amount")) β β β
β β β β β β
β β β # Write back to Snowflake β β β
β β β transformed_df.write \ β β β
β β β .format("snowflake") \ β β β
β β β .option("url", "snowflake://account/db/schema") \ β β β
β β β .option("dbtable", "target_table") \ β β β
β β β .mode("overwrite") \ β β β
β β β .save() β β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β β Connector Layer β
β βΌ β
β SNOWFLAKE CONNECTOR FOR SPARK β
β βββββββββββββββββββββββββββββββ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Connector Features: β β β
β β β β β β
β β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β β’ Pushdown predicates to Snowflake β β β β
β β β β β’ Partition pruning β β β β
β β β β β’ Parallel data loading β β β β
β β β β β’ Schema inference β β β β
β β β β β’ Type mapping β β β β
β β β β β’ Temporary tables for staging β β β β
β β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β β β
β β β Configuration Options: β β β
β β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β β’ snowflake.jdbc.net.snowflake.account.url β β β β
β β β β β’ snowflake.user β β β β
β β β β β’ snowflake.password β β β β
β β β β β’ snowflake.database β β β β
β β β β β’ snowflake.schema β β β β
β β β β β’ snowflake.role β β β β
β β β β β’ snowflake.warehouse β β β β
β β β β β’ snowflake.numεεΊ (parallelism) β β β β
β β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β β Data Flow β
β βΌ β
β DATA PIPELINE β
β ββββββββββββββ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Typical ETL Pattern: β β β
β β β β β β
β β β Snowflake βββΆ Spark Processing βββΆ Snowflake β β β
β β β (Read) (Transform) (Write) β β β
β β β β β β
β β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β 1. Extract: Read raw data from Snowflake β β β β
β β β β 2. Transform: Apply Spark transformations β β β β
β β β β 3. Load: Write processed data back to Snowflake β β β β
β β β β 4. Validate: Verify data quality β β β β
β β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β β β
β β β Alternative: Snowpark Pushdown β β β
β β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β # Use Snowpark for transformations that can β β β β
β β β β # execute entirely in Snowflake β β β β
β β β β from snowflake.snowpark import Session β β β β
β β β β session = Session.builder.configs({...}).create() β β β β
β β β β df = session.table("source") β β β β
β β β β result = df.filter(col("amount") > 1000) β β β β
β β β β result.write.mode("overwrite").save_as_table("target")β β β β
β β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Architecture Diagram 2: Airflow Integration Architecture
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β AIRFLOW-SNOWFLAKE INTEGRATION ARCHITECTURE β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β AIRFLOW ORCHESTRATION β
β ββββββββββββββββββββββ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Airflow DAG Definition: β β β
β β β β β β
β β β from airflow import DAG β β β
β β β from airflow.providers.snowflake.operators.snowflake \ β β β
β β β import SnowflakeOperator β β β
β β β from airflow.providers.snowflake.transfers.snowflake_to_s3 \β β β
β β β import SnowflakeToS3Operator β β β
β β β β β β
β β β with DAG('snowflake_pipeline', β β β
β β β schedule_interval='@daily', β β β
β β β default_args=default_args) as dag: β β β
β β β β β β
β β β extract = SnowflakeOperator( β β β
β β β task_id='extract_data', β β β
β β β sql='SELECT * FROM source_table', β β β
β β β snowflake_conn_id='snowflake_default', β β β
β β β warehouse='ETL_WH', β β β
β β β database='RAW_DB', β β β
β β β schema='PUBLIC' β β β
β β β ) β β β
β β β β β β
β β β transform = SnowflakeOperator( β β β
β β β task_id='transform_data', β β β
β β β sql='transform_query.sql', β β β
β β β parameters={'date': '{{ ds }}'} β β β
β β β ) β β β
β β β β β β
β β β extract >> transform >> load β β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β β Operators & Hooks β
β βΌ β
β AIRFLOW-SNOWFLAKE COMPONENTS β
β βββββββββββββββββββββββββββββ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Operators: β β β
β β β β β β
β β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β SnowflakeOperator: β β β β
β β β β β’ Execute SQL statements β β β β
β β β β β’ Support multiple statements β β β β
β β β β β’ Parameterized queries β β β β
β β β β β’ Transaction support β β β β
β β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β β β
β β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β SnowflakeToS3Operator: β β β β
β β β β β’ Export data to S3 β β β β
β β β β β’ Supports Parquet, CSV, JSON β β β β
β β β β β’ Configurable compression β β β β
β β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β β β
β β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β S3ToSnowflakeOperator: β β β β
β β β β β’ Import data from S3 β β β β
β β β β β’ COPY INTO support β β β β
β β β β β’ File format handling β β β β
β β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Hooks: β β β
β β β β β β
β β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β SnowflakeHook: β β β β
β β β β β’ Connection management β β β β
β β β β β’ Credential handling β β β β
β β β β β’ SQL execution β β β β
β β β β β’ Database cursors β β β β
β β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β β β
β β β Connection Configuration: β β β
β β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β conn_id: snowflake_default β β β β
β β β β conn_type: snowflake β β β β
β β β β host: account.snowflakecomputing.com β β β β
β β β β schema: public β β β β
β β β β login: username β β β β
β β β β password: **** β β β β
β β β β extra: {"database": "mydb", "warehouse": "mywh"} β β β β
β β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β β Pipeline Execution β
β βΌ β
β END-TO-END PIPELINE β
β βββββββββββββββββββ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β ββββββββββ ββββββββββ ββββββββββ ββββββββββ βββββββββ β β β
β β β β ExtractββββΆβValidateββββΆβTransformββββΆβ Load ββββΆβNotify β β β β
β β β β Data β β Data β β Data β β Data β β Users β β β β
β β β ββββββββββ ββββββββββ ββββββββββ ββββββββββ βββββββββ β β β
β β β β β β β β β β β
β β β βΌ βΌ βΌ βΌ βΌ β β β
β β β ββββββββββ ββββββββββ ββββββββββ ββββββββββ βββββββββ β β β
β β β βSource β βData β βSpark/ β βTarget β βEmail/ β β β β
β β β βTables β βQuality β βSnowparkβ βTables β βSlack β β β β
β β β ββββββββββ ββββββββββ ββββββββββ ββββββββββ βββββββββ β β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Architecture Diagram 3: dbt Integration Architecture
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β DBT-SNOWFLAKE INTEGRATION ARCHITECTURE β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β DBT PROJECT STRUCTURE β
β βββββββββββββββββββββ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β my_dbt_project/ β β β
β β β βββ dbt_project.yml β β β
β β β βββ profiles.yml β β β
β β β βββ models/ β β β
β β β β βββ staging/ β β β
β β β β β βββ stg_raw_sales.sql β β β
β β β β β βββ stg_raw_customers.sql β β β
β β β β β βββ _staging__models.yml β β β
β β β β βββ intermediate/ β β β
β β β β β βββ int_sales_with_customers.sql β β β
β β β β β βββ _intermediate__models.yml β β β
β β β β βββ marts/ β β β
β β β β βββ fct_orders.sql β β β
β β β β βββ dim_customers.sql β β β
β β β β βββ _marts__models.yml β β β
β β β βββ tests/ β β β
β β β β βββ generic/ β β β
β β β β βββ singular/ β β β
β β β βββ macros/ β β β
β β β β βββ generate_schema_name.sql β β β
β β β βββ seeds/ β β β
β β β βββ country_codes.csv β β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β β dbt Commands β
β βΌ β
β DBT EXECUTION β
β ββββββββββββββ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β dbt run: Execute models β β β
β β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β $ dbt run β β β β
β β β β β β β β
β β β β Running with dbt=1.7.0 β β β β
β β β β Found 8 models, 3 tests, 2 sources β β β β
β β β β β β β β
β β β β 1 of 8 START view model public.stg_raw_sales ........ β β β β
β β β β 1 of 8 OK created view model public.stg_raw_sales β β β β
β β β β 2 of 8 START view model public.stg_raw_customers .... β β β β
β β β β 2 of 8 OK created view model public.stg_raw_customers β β β β
β β β β ... β β β β
β β β β 8 of 8 OK created table model public.fct_orders β β β β
β β β β β β β β
β β β β Done. PASS=8 WARN=0 ERROR=0 SKIP=0 β β β β
β β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β dbt test: Run data quality tests β β β
β β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β $ dbt test β β β β
β β β β β β β β
β β β β Running with dbt=1.7.0 β β β β
β β β β Found 3 tests β β β β
β β β β β β β β
β β β β 1 of 3 START test not_null_orders_order_id ........... β β β β
β β β β 1 of 3 PASS not_null_orders_order_id β β β β
β β β β 2 of 3 START test unique_orders_order_id .............. β β β β
β β β β 2 of 3 PASS unique_orders_order_id β β β β
β β β β 3 of 3 START test relationships_orders_customer_id ... β β β β
β β β β 3 of 3 PASS relationships_orders_customer_id β β β β
β β β β β β β β
β β β β Done. PASS=3 WARN=0 ERROR=0 SKIP=0 β β β β
β β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β β Generated SQL β
β βΌ β
β SNOWFLAKE SQL EXECUTION β
β βββββββββββββββββββββββ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Staging Model (stg_raw_sales.sql): β β β
β β β β β β
β β β with source as ( β β β
β β β select * from {{ source('raw', 'sales') }} β β β
β β β ), β β β
β β β renamed as ( β β β
β β β select β β β
β β β order_id as order_id, β β β
β β β customer_id as customer_id, β β β
β β β order_date as order_date, β β β
β β β amount as order_amount, β β β
β β β status as order_status β β β
β β β from source β β β
β β β ) β β β
β β β select * from renamed β β β
β β β β β β
β β β -- Compiles to: β β β
β β β -- select order_id, customer_id, order_date, β β β
β β β -- amount as order_amount, status as order_status β β β
β β β -- from raw.sales β β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Mart Model (fct_orders.sql): β β β
β β β β β β
β β β with orders as ( β β β
β β β select * from {{ ref('stg_raw_sales') }} β β β
β β β ), β β β
β β β customers as ( β β β
β β β select * from {{ ref('dim_customers') }} β β β
β β β ), β β β
β β β final as ( β β β
β β β select β β β
β β β orders.order_id, β β β
β β β orders.customer_id, β β β
β β β customers.customer_name, β β β
β β β orders.order_date, β β β
β β β orders.order_amount, β β β
β β β orders.order_status β β β
β β β from orders β β β
β β β left join customers on orders.customer_id = β β β
β β β customers.customer_id β β β
β β β ) β β β
β β β select * from final β β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
External Functions enable calling external APIs (REST, gRPC, custom logic) from SQL queries. They run as managed services outside Snowflake but integrate seamlessly, enabling data enrichment, external API calls, and integration with third-party systems without leaving Snowflake.
Snowpark Container Services run custom containers (Docker) directly on Snowflake, enabling any language, library, or runtime (ML models, ETL tools, custom applications) within Snowflake's managed infrastructure without external hosting.
Use Snowpark Container Services for ML model serving and custom runtimes. Use External Functions for lightweight API integrations. Use External Tables with streams for near-real-time data lake ingestion.
- External Functions: Call REST APIs, custom logic from SQL
- Snowpark Containers: Run any Docker container on Snowflake infrastructure
- External Tables: Query cloud storage files directly (Parquet, JSON, ORC)
- Data sharing: Zero-copy sharing across Snowflake accounts
- Stream processing: Real-time ingestion via streams + tasks + external functions
Detailed Explanation
Spark Integration
Apache Spark integration with Snowflake enables large-scale data processing using Spark's distributed compute capabilities while leveraging Snowflake's managed data platform. The Snowflake Connector for Spark provides native read/write capabilities, predicate pushdown, and schema inference. This integration is ideal for complex transformations that benefit from Spark's in-memory processing or require Spark-specific libraries (MLlib, GraphX).
The connector supports pushdown predicates, where Spark filters are translated to Snowflake SQL and executed during data retrieval. This optimization reduces data transfer between Snowflake and Spark, improving performance for filtered queries. Partition pruning leverages Snowflake's micro-partition metadata to eliminate irrelevant data during reads.
Data loading uses Snowflake's COPY command for efficient bulk loading, with configurable parallelism and file formats. The connector automatically creates temporary staging tables and manages the loading process. For small datasets, direct INSERT operations may be more efficient than COPY loading.
Airflow Integration
Apache Airflow orchestrates Snowflake data pipelines through operators, hooks, and sensors. The SnowflakeOperator executes SQL statements with support for parameterized queries, transactions, and multi-statement execution. The operator handles connection management, error handling, and result processing.
Transfer operators enable data movement between Snowflake and cloud storage (S3, Azure Blob, GCS). The SnowflakeToS3Operator exports query results to cloud storage in various formats (Parquet, CSV, JSON), while S3ToSnowflakeOperator imports data from cloud storage using COPY commands. These operators are essential for building data lakes and integrating with external systems.
Hooks provide low-level connection management and SQL execution capabilities. The SnowflakeHook manages authentication, session handling, and cursor operations. Hooks can be used in custom operators, sensors, and Python functions for advanced pipeline requirements.
dbt Integration
dbt (data build tool) provides a SQL-based transformation framework that integrates natively with Snowflake. dbt models are SQL SELECT statements that are materialized as views or tables in Snowflake. The framework handles dependency management, incremental processing, testing, and documentation generation.
Staging models clean and standardize raw data, applying naming conventions and data type conversions. Intermediate models combine staging models into business-ready datasets. Mart models create final analytical tables optimized for reporting and dashboards. This layered approach improves maintainability and data quality.
dbt tests validate data quality by checking for nulls, uniqueness, referential integrity, and custom business rules. Tests are executed as SQL queries that return failing rows, enabling rapid identification of data quality issues. Test results are stored in Snowflake for reporting and alerting.
Integration Patterns
ELT Pattern: Extract data to Snowflake, load into raw tables, then transform using dbt or Spark. This pattern leverages Snowflake's compute for transformations while maintaining raw data for auditing.
Real-time Integration: Use Snowpipe for continuous data loading, combined with Streams and Tasks for real-time processing. Airflow or custom applications orchestrate the pipeline.
Hybrid Processing: Use Snowpark for transformations that can execute within Snowflake, and Spark for complex transformations requiring external libraries. This pattern optimizes compute usage while maintaining flexibility.
Key Concepts Table
| Integration | Primary Use | Strengths | Limitations |
|---|---|---|---|
| Spark | Large-scale processing | ML libraries, complex transforms | Requires Spark cluster |
| Airflow | Orchestration | Rich operators, scheduling | Steep learning curve |
| dbt | SQL transformations | Version control, testing | SQL-only transformations |
| Component | Purpose | Configuration |
|---|---|---|
| Spark Connector | Data read/write | Snowflake connection options |
| Airflow Operator | Task execution | SQL, parameters, connections |
| dbt Model | Data transformation | SQL, Jinja templating |
| Metric | Spark | Airflow | dbt |
|---|---|---|---|
| Setup Complexity | High | Medium | Low |
| Learning Curve | High | Medium | Low |
| Flexibility | Very High | High | Medium |
| Performance | Very High | N/A | High |
Code Examples
-- Example 1: Spark-Snowflake read
-- PySpark code
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("SnowflakeRead") \
.config("spark.jars.packages",
"net.snowflake:spark-snowflake_2.12:2.13.0-spark_3.3") \
.getOrCreate()
# Read from Snowflake
df = spark.read \
.format("snowflake") \
.option("url", "myaccount.snowflakecomputing.com") \
.option("user", "myuser") \
.option("password", "mypassword") \
.option("db", "ANALYTICS_DB") \
.option("schema", "PUBLIC") \
.option("warehouse", "ANALYTICS_WH") \
.option("dbtable", "sales_data") \
.load()
# Transform and write back
transformed_df = df.filter("amount > 1000") \
.groupBy("region") \
.agg({"amount": "sum"})
transformed_df.write \
.format("snowflake") \
.option("dbtable", "sales_summary") \
.mode("overwrite") \
.save()
# Example 2: Airflow DAG for Snowflake pipeline
from airflow import DAG
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'data_team',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1),
'email_on_failure': True,
'email': ['data-team@company.com'],
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
with DAG('snowflake_etl_pipeline',
default_args=default_args,
schedule_interval='@daily',
catchup=False) as dag:
extract_data = SnowflakeOperator(
task_id='extract_raw_data',
sql="""
INSERT INTO raw_sales
SELECT * FROM staging_sales
WHERE load_date = '{{ ds }}'
""",
snowflake_conn_id='snowflake_default',
warehouse='ETL_WH',
database='RAW_DB',
schema='PUBLIC'
)
validate_data = SnowflakeOperator(
task_id='validate_data',
sql="""
SELECT COUNT(*) as row_count
FROM raw_sales
WHERE load_date = '{{ ds }}'
""",
snowflake_conn_id='snowflake_default'
)
transform_data = SnowflakeOperator(
task_id='transform_data',
sql="transform_query.sql",
parameters={'execution_date': '{{ ds }}'},
snowflake_conn_id='snowflake_default'
)
extract_data >> validate_data >> transform_data
-- Example 3: dbt model (staging/stg_orders.sql)
with source as (
select * from {{ source('raw', 'orders') }}
),
renamed as (
select
order_id,
customer_id,
order_date,
amount as order_amount,
status as order_status,
created_at,
updated_at
from source
)
select * from renamed
-- Example 4: dbt model (marts/fct_orders.sql)
with orders as (
select * from {{ ref('stg_orders') }}
),
customers as (
select * from {{ ref('dim_customers') }}
),
final as (
select
orders.order_id,
orders.customer_id,
customers.customer_name,
customers.customer_segment,
orders.order_date,
orders.order_amount,
orders.order_status,
DATE_TRUNC('month', orders.order_date) as order_month
from orders
left join customers
on orders.customer_id = customers.customer_id
)
select * from final
-- Example 5: Airflow custom operator for Snowflake
from airflow.models import BaseOperator
from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook
from airflow.utils.decorators import apply_defaults
class SnowflakeDataQualityOperator(BaseOperator):
@apply_defaults
def __init__(
self,
sql,
expected_result,
snowflake_conn_id='snowflake_default',
*args, **kwargs
):
super().__init__(*args, **kwargs)
self.sql = sql
self.expected_result = expected_result
self.snowflake_conn_id = snowflake_conn_id
def execute(self, context):
hook = SnowflakeHook(snowflake_conn_id=self.snowflake_conn_id)
result = hook.get_first(self.sql)
if result[0] != self.expected_result:
raise ValueError(
f"Data quality check failed: {result[0]} != {self.expected_result}"
)
return result[0]
# Usage in DAG
quality_check = SnowflakeDataQualityOperator(
task_id='check_row_count',
sql="SELECT COUNT(*) FROM fct_orders WHERE order_date = CURRENT_DATE()",
expected_result=1000,
snowflake_conn_id='snowflake_default'
)
-- Example 6: dbt test (tests/not_null_order_id.sql)
SELECT *
FROM {{ ref('fct_orders') }}
WHERE order_id IS NULL
-- Example 7: dbt source definition (models/sources.yml)
version: 2
sources:
- name: raw
database: RAW_DB
schema: PUBLIC
tables:
- name: orders
loaded_at_field: updated_at
freshness:
warn_after: {count: 12, period: hour}
error_after: {count: 24, period: hour}
- name: customers
columns:
- name: customer_id
tests:
- unique
- not_null
Performance Metrics
| Metric | Spark | Airflow | dbt |
|---|---|---|---|
| Setup Time | 2-4 hours | 1-2 hours | 30 min |
| Learning Curve | 2-4 weeks | 1-2 weeks | 3-5 days |
| Pipeline Speed | Very Fast | Moderate | Fast |
| Maintenance | High | Medium | Low |
Best Practices
-
Choose the right tool: Use Spark for complex transformations requiring ML libraries, Airflow for orchestration, and dbt for SQL transformations.
-
Use Snowpark when possible: Prefer Snowpark over Spark for transformations that can execute within Snowflake to minimize data movement.
-
Implement incremental processing: Use dbt incremental models and Airflow backfill capabilities to process only new data.
-
Test thoroughly: Implement data quality tests in dbt and validation checks in Airflow to catch issues early.
-
Monitor pipelines: Track pipeline execution times, error rates, and resource usage for optimization.
-
Version control: Use Git for all pipeline code, including dbt models, Airflow DAGs, and Spark applications.
-
Document dependencies: Maintain clear documentation of data lineage, dependencies, and transformation logic.
-
Optimize performance: Use appropriate partitioning, clustering, and warehouse sizing for each pipeline component.
-
Implement error handling: Add retry logic, alerting, and fallback mechanisms for production pipelines.
-
Regular reviews: Conduct weekly pipeline reviews to identify optimization opportunities and address issues proactively.
See Also
- PySpark Iceberg - Spark integration patterns
- Delta Lake on Databricks - Delta Lake integration
- Data Warehouse Concepts - Data warehouse design principles