Cross-Platform Integration: Spark, Airflow & dbt

Free Lesson

Advertisement

Cross-Platform Integration: Spark, Airflow & dbt

Architecture Diagram 1: Spark Integration Architecture

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                    SPARK-SNOWFLAKE INTEGRATION ARCHITECTURE                  β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                             β”‚
β”‚  SPARK APPLICATION                                                          β”‚
β”‚  ══════════════════                                                         β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                                                                      β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β”‚
β”‚  β”‚  β”‚  Spark Session Configuration:                                β”‚   β”‚   β”‚
β”‚  β”‚  β”‚                                                               β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  from pyspark.sql import SparkSession                         β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  from snowflake.connector import SnowflakeConnection         β”‚   β”‚   β”‚
β”‚  β”‚  β”‚                                                               β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  spark = SparkSession.builder \                              β”‚   β”‚   β”‚
β”‚  β”‚  β”‚    .appName("SnowflakeIntegration") \                        β”‚   β”‚   β”‚
β”‚  β”‚  β”‚    .config("spark.jars.packages",                            β”‚   β”‚   β”‚
β”‚  β”‚  β”‚            "net.snowflake:snowflake-jdbc:3.15.0") \          β”‚   β”‚   β”‚
β”‚  β”‚  β”‚    .getOrCreate()                                            β”‚   β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β”‚
β”‚  β”‚                                                                      β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β”‚
β”‚  β”‚  β”‚  Data Processing:                                            β”‚   β”‚   β”‚
β”‚  β”‚  β”‚                                                               β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  # Read from Snowflake                                       β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  df = spark.read \                                            β”‚   β”‚   β”‚
β”‚  β”‚  β”‚    .format("snowflake") \                                    β”‚   β”‚   β”‚
β”‚  β”‚  β”‚    .option("url", "snowflake://account/db/schema") \         β”‚   β”‚   β”‚
β”‚  β”‚  β”‚    .option("user", "username") \                             β”‚   β”‚   β”‚
β”‚  β”‚  β”‚    .option("password", "password") \                         β”‚   β”‚   β”‚
β”‚  β”‚  β”‚    .option("dbtable", "source_table") \                      β”‚   β”‚   β”‚
β”‚  β”‚  β”‚    .load()                                                   β”‚   β”‚   β”‚
β”‚  β”‚  β”‚                                                               β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  # Transform data                                            β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  transformed_df = df.filter(col("amount") > 1000) \         β”‚   β”‚   β”‚
β”‚  β”‚  β”‚    .groupBy("region") \                                      β”‚   β”‚   β”‚
β”‚  β”‚  β”‚    .agg(sum("amount").alias("total_amount"))                 β”‚   β”‚   β”‚
β”‚  β”‚  β”‚                                                               β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  # Write back to Snowflake                                   β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  transformed_df.write \                                      β”‚   β”‚   β”‚
β”‚  β”‚  β”‚    .format("snowflake") \                                    β”‚   β”‚   β”‚
β”‚  β”‚  β”‚    .option("url", "snowflake://account/db/schema") \         β”‚   β”‚   β”‚
β”‚  β”‚  β”‚    .option("dbtable", "target_table") \                      β”‚   β”‚   β”‚
β”‚  β”‚  β”‚    .mode("overwrite") \                                      β”‚   β”‚   β”‚
β”‚  β”‚  β”‚    .save()                                                   β”‚   β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                    β”‚                                        β”‚
β”‚                                    β”‚  Connector Layer                       β”‚
β”‚                                    β–Ό                                        β”‚
β”‚  SNOWFLAKE CONNECTOR FOR SPARK                                               β”‚
β”‚  ═══════════════════════════════                                              β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                                                                      β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β”‚
β”‚  β”‚  β”‚  Connector Features:                                         β”‚   β”‚   β”‚
β”‚  β”‚  β”‚                                                               β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  β€’ Pushdown predicates to Snowflake                    β”‚  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  β€’ Partition pruning                                    β”‚  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  β€’ Parallel data loading                                β”‚  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  β€’ Schema inference                                     β”‚  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  β€’ Type mapping                                         β”‚  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  β€’ Temporary tables for staging                         β”‚  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚                                                               β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  Configuration Options:                                      β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  β€’ snowflake.jdbc.net.snowflake.account.url           β”‚  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  β€’ snowflake.user                                       β”‚  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  β€’ snowflake.password                                   β”‚  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  β€’ snowflake.database                                   β”‚  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  β€’ snowflake.schema                                     β”‚  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  β€’ snowflake.role                                       β”‚  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  β€’ snowflake.warehouse                                  β”‚  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  β€’ snowflake.numεˆ†εŒΊ (parallelism)                      β”‚  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚   β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                    β”‚                                        β”‚
β”‚                                    β”‚  Data Flow                             β”‚
β”‚                                    β–Ό                                        β”‚
β”‚  DATA PIPELINE                                                               β”‚
β”‚  ══════════════                                                             β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                                                                      β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β”‚
β”‚  β”‚  β”‚  Typical ETL Pattern:                                         β”‚   β”‚   β”‚
β”‚  β”‚  β”‚                                                               β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  Snowflake ──▢ Spark Processing ──▢ Snowflake                β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  (Read)         (Transform)         (Write)                   β”‚   β”‚   β”‚
β”‚  β”‚  β”‚                                                               β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  1. Extract: Read raw data from Snowflake              β”‚  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  2. Transform: Apply Spark transformations             β”‚  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  3. Load: Write processed data back to Snowflake       β”‚  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  4. Validate: Verify data quality                      β”‚  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚                                                               β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  Alternative: Snowpark Pushdown                              β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  # Use Snowpark for transformations that can           β”‚  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  # execute entirely in Snowflake                       β”‚  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  from snowflake.snowpark import Session                 β”‚  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  session = Session.builder.configs({...}).create()      β”‚  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  df = session.table("source")                           β”‚  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  result = df.filter(col("amount") > 1000)              β”‚  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  result.write.mode("overwrite").save_as_table("target")β”‚  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚   β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                                             β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Architecture Diagram 2: Airflow Integration Architecture

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                    AIRFLOW-SNOWFLAKE INTEGRATION ARCHITECTURE                β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                             β”‚
β”‚  AIRFLOW ORCHESTRATION                                                      β”‚
β”‚  ══════════════════════                                                     β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                                                                      β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β”‚
β”‚  β”‚  β”‚  Airflow DAG Definition:                                     β”‚   β”‚   β”‚
β”‚  β”‚  β”‚                                                               β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  from airflow import DAG                                      β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  from airflow.providers.snowflake.operators.snowflake \      β”‚   β”‚   β”‚
β”‚  β”‚  β”‚    import SnowflakeOperator                                   β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  from airflow.providers.snowflake.transfers.snowflake_to_s3 \β”‚   β”‚   β”‚
β”‚  β”‚  β”‚    import SnowflakeToS3Operator                               β”‚   β”‚   β”‚
β”‚  β”‚  β”‚                                                               β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  with DAG('snowflake_pipeline',                               β”‚   β”‚   β”‚
β”‚  β”‚  β”‚           schedule_interval='@daily',                         β”‚   β”‚   β”‚
β”‚  β”‚  β”‚           default_args=default_args) as dag:                  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚                                                               β”‚   β”‚   β”‚
β”‚  β”‚  β”‚      extract = SnowflakeOperator(                             β”‚   β”‚   β”‚
β”‚  β”‚  β”‚          task_id='extract_data',                              β”‚   β”‚   β”‚
β”‚  β”‚  β”‚          sql='SELECT * FROM source_table',                    β”‚   β”‚   β”‚
β”‚  β”‚  β”‚          snowflake_conn_id='snowflake_default',               β”‚   β”‚   β”‚
β”‚  β”‚  β”‚          warehouse='ETL_WH',                                  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚          database='RAW_DB',                                   β”‚   β”‚   β”‚
β”‚  β”‚  β”‚          schema='PUBLIC'                                      β”‚   β”‚   β”‚
β”‚  β”‚  β”‚      )                                                        β”‚   β”‚   β”‚
β”‚  β”‚  β”‚                                                               β”‚   β”‚   β”‚
β”‚  β”‚  β”‚      transform = SnowflakeOperator(                           β”‚   β”‚   β”‚
β”‚  β”‚  β”‚          task_id='transform_data',                            β”‚   β”‚   β”‚
β”‚  β”‚  β”‚          sql='transform_query.sql',                           β”‚   β”‚   β”‚
β”‚  β”‚  β”‚          parameters={'date': '{{ ds }}'}                      β”‚   β”‚   β”‚
β”‚  β”‚  β”‚      )                                                        β”‚   β”‚   β”‚
β”‚  β”‚  β”‚                                                               β”‚   β”‚   β”‚
β”‚  β”‚  β”‚      extract >> transform >> load                             β”‚   β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                    β”‚                                        β”‚
β”‚                                    β”‚  Operators & Hooks                     β”‚
β”‚                                    β–Ό                                        β”‚
β”‚  AIRFLOW-SNOWFLAKE COMPONENTS                                                β”‚
β”‚  ═════════════════════════════                                               β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                                                                      β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β”‚
β”‚  β”‚  β”‚  Operators:                                                   β”‚   β”‚   β”‚
β”‚  β”‚  β”‚                                                               β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  SnowflakeOperator:                                     β”‚  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  β€’ Execute SQL statements                               β”‚  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  β€’ Support multiple statements                          β”‚  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  β€’ Parameterized queries                                β”‚  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  β€’ Transaction support                                  β”‚  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚                                                               β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  SnowflakeToS3Operator:                                 β”‚  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  β€’ Export data to S3                                    β”‚  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  β€’ Supports Parquet, CSV, JSON                          β”‚  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  β€’ Configurable compression                             β”‚  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚                                                               β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  S3ToSnowflakeOperator:                                 β”‚  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  β€’ Import data from S3                                  β”‚  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  β€’ COPY INTO support                                    β”‚  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  β€’ File format handling                                 β”‚  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚   β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β”‚
β”‚  β”‚                                                                      β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β”‚
β”‚  β”‚  β”‚  Hooks:                                                       β”‚   β”‚   β”‚
β”‚  β”‚  β”‚                                                               β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  SnowflakeHook:                                         β”‚  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  β€’ Connection management                                β”‚  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  β€’ Credential handling                                  β”‚  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  β€’ SQL execution                                        β”‚  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  β€’ Database cursors                                     β”‚  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚                                                               β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  Connection Configuration:                                   β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  conn_id: snowflake_default                             β”‚  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  conn_type: snowflake                                   β”‚  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  host: account.snowflakecomputing.com                  β”‚  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  schema: public                                         β”‚  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  login: username                                        β”‚  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  password: ****                                         β”‚  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  extra: {"database": "mydb", "warehouse": "mywh"}      β”‚  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚   β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                    β”‚                                        β”‚
β”‚                                    β”‚  Pipeline Execution                   β”‚
β”‚                                    β–Ό                                        β”‚
β”‚  END-TO-END PIPELINE                                                         β”‚
β”‚  ═══════════════════                                                         β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                                                                      β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β”‚
β”‚  β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β” β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚ Extract│──▢│Validate│──▢│Transform│──▢│  Load  │──▢│Notify β”‚ β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  Data  β”‚  β”‚  Data  β”‚  β”‚  Data  β”‚  β”‚  Data  β”‚  β”‚ Users β”‚ β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚   β”‚   β”‚
β”‚  β”‚  β”‚       β”‚           β”‚           β”‚           β”‚           β”‚      β”‚   β”‚   β”‚
β”‚  β”‚  β”‚       β–Ό           β–Ό           β–Ό           β–Ό           β–Ό      β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β” β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚Source  β”‚  β”‚Data    β”‚  β”‚Spark/  β”‚  β”‚Target  β”‚  β”‚Email/ β”‚ β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚Tables  β”‚  β”‚Quality β”‚  β”‚Snowparkβ”‚  β”‚Tables  β”‚  β”‚Slack  β”‚ β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚   β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                                             β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Architecture Diagram 3: dbt Integration Architecture

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                    DBT-SNOWFLAKE INTEGRATION ARCHITECTURE                    β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                             β”‚
β”‚  DBT PROJECT STRUCTURE                                                      β”‚
β”‚  ═════════════════════                                                      β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                                                                      β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β”‚
β”‚  β”‚  β”‚  my_dbt_project/                                             β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”œβ”€β”€ dbt_project.yml                                         β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”œβ”€β”€ profiles.yml                                            β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”œβ”€β”€ models/                                                  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚   β”œβ”€β”€ staging/                                             β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚   β”‚   β”œβ”€β”€ stg_raw_sales.sql                               β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚   β”‚   β”œβ”€β”€ stg_raw_customers.sql                           β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚   β”‚   └── _staging__models.yml                            β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚   β”œβ”€β”€ intermediate/                                        β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚   β”‚   β”œβ”€β”€ int_sales_with_customers.sql                    β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚   β”‚   └── _intermediate__models.yml                       β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚   └── marts/                                               β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚       β”œβ”€β”€ fct_orders.sql                                  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚       β”œβ”€β”€ dim_customers.sql                               β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚       └── _marts__models.yml                              β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”œβ”€β”€ tests/                                                   β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚   β”œβ”€β”€ generic/                                             β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚   └── singular/                                            β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”œβ”€β”€ macros/                                                  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚   └── generate_schema_name.sql                            β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  └── seeds/                                                   β”‚   β”‚   β”‚
β”‚  β”‚  β”‚      └── country_codes.csv                                   β”‚   β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                    β”‚                                        β”‚
β”‚                                    β”‚  dbt Commands                          β”‚
β”‚                                    β–Ό                                        β”‚
β”‚  DBT EXECUTION                                                               β”‚
β”‚  ══════════════                                                             β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                                                                      β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β”‚
β”‚  β”‚  β”‚  dbt run: Execute models                                     β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  $ dbt run                                              β”‚  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚                                                         β”‚  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  Running with dbt=1.7.0                                  β”‚  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  Found 8 models, 3 tests, 2 sources                     β”‚  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚                                                         β”‚  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  1 of 8 START view model public.stg_raw_sales ........  β”‚  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  1 of 8 OK created view model public.stg_raw_sales     β”‚  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  2 of 8 START view model public.stg_raw_customers ....  β”‚  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  2 of 8 OK created view model public.stg_raw_customers β”‚  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  ...                                                    β”‚  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  8 of 8 OK created table model public.fct_orders       β”‚  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚                                                         β”‚  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  Done. PASS=8 WARN=0 ERROR=0 SKIP=0                    β”‚  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚   β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β”‚
β”‚  β”‚                                                                      β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β”‚
β”‚  β”‚  β”‚  dbt test: Run data quality tests                            β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  $ dbt test                                             β”‚  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚                                                         β”‚  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  Running with dbt=1.7.0                                  β”‚  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  Found 3 tests                                           β”‚  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚                                                         β”‚  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  1 of 3 START test not_null_orders_order_id ...........  β”‚  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  1 of 3 PASS not_null_orders_order_id                   β”‚  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  2 of 3 START test unique_orders_order_id ..............  β”‚  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  2 of 3 PASS unique_orders_order_id                     β”‚  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  3 of 3 START test relationships_orders_customer_id ...  β”‚  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  3 of 3 PASS relationships_orders_customer_id           β”‚  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚                                                         β”‚  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β”‚  Done. PASS=3 WARN=0 ERROR=0 SKIP=0                     β”‚  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚   β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                    β”‚                                        β”‚
β”‚                                    β”‚  Generated SQL                         β”‚
β”‚                                    β–Ό                                        β”‚
β”‚  SNOWFLAKE SQL EXECUTION                                                     β”‚
β”‚  ═══════════════════════                                                     β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                                                                      β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β”‚
β”‚  β”‚  β”‚  Staging Model (stg_raw_sales.sql):                          β”‚   β”‚   β”‚
β”‚  β”‚  β”‚                                                               β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  with source as (                                             β”‚   β”‚   β”‚
β”‚  β”‚  β”‚      select * from {{ source('raw', 'sales') }}              β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  ),                                                           β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  renamed as (                                                 β”‚   β”‚   β”‚
β”‚  β”‚  β”‚      select                                                   β”‚   β”‚   β”‚
β”‚  β”‚  β”‚          order_id as order_id,                                β”‚   β”‚   β”‚
β”‚  β”‚  β”‚          customer_id as customer_id,                          β”‚   β”‚   β”‚
β”‚  β”‚  β”‚          order_date as order_date,                            β”‚   β”‚   β”‚
β”‚  β”‚  β”‚          amount as order_amount,                              β”‚   β”‚   β”‚
β”‚  β”‚  β”‚          status as order_status                               β”‚   β”‚   β”‚
β”‚  β”‚  β”‚      from source                                              β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  )                                                            β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  select * from renamed                                        β”‚   β”‚   β”‚
β”‚  β”‚  β”‚                                                               β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  -- Compiles to:                                              β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  -- select order_id, customer_id, order_date,                β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  --        amount as order_amount, status as order_status    β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  -- from raw.sales                                            β”‚   β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β”‚
β”‚  β”‚                                                                      β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚   β”‚
β”‚  β”‚  β”‚  Mart Model (fct_orders.sql):                                β”‚   β”‚   β”‚
β”‚  β”‚  β”‚                                                               β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  with orders as (                                             β”‚   β”‚   β”‚
β”‚  β”‚  β”‚      select * from {{ ref('stg_raw_sales') }}               β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  ),                                                           β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  customers as (                                               β”‚   β”‚   β”‚
β”‚  β”‚  β”‚      select * from {{ ref('dim_customers') }}               β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  ),                                                           β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  final as (                                                   β”‚   β”‚   β”‚
β”‚  β”‚  β”‚      select                                                   β”‚   β”‚   β”‚
β”‚  β”‚  β”‚          orders.order_id,                                     β”‚   β”‚   β”‚
β”‚  β”‚  β”‚          orders.customer_id,                                  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚          customers.customer_name,                             β”‚   β”‚   β”‚
β”‚  β”‚  β”‚          orders.order_date,                                   β”‚   β”‚   β”‚
β”‚  β”‚  β”‚          orders.order_amount,                                 β”‚   β”‚   β”‚
β”‚  β”‚  β”‚          orders.order_status                                  β”‚   β”‚   β”‚
β”‚  β”‚  β”‚      from orders                                              β”‚   β”‚   β”‚
β”‚  β”‚  β”‚      left join customers on orders.customer_id =             β”‚   β”‚   β”‚
β”‚  β”‚  β”‚                              customers.customer_id           β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  )                                                            β”‚   β”‚   β”‚
β”‚  β”‚  β”‚  select * from final                                          β”‚   β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                                             β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

External Functions enable calling external APIs (REST, gRPC, custom logic) from SQL queries. They run as managed services outside Snowflake but integrate seamlessly, enabling data enrichment, external API calls, and integration with third-party systems without leaving Snowflake.

Snowpark Container Services run custom containers (Docker) directly on Snowflake, enabling any language, library, or runtime (ML models, ETL tools, custom applications) within Snowflake's managed infrastructure without external hosting.

Use Snowpark Container Services for ML model serving and custom runtimes. Use External Functions for lightweight API integrations. Use External Tables with streams for near-real-time data lake ingestion.

  • External Functions: Call REST APIs, custom logic from SQL
  • Snowpark Containers: Run any Docker container on Snowflake infrastructure
  • External Tables: Query cloud storage files directly (Parquet, JSON, ORC)
  • Data sharing: Zero-copy sharing across Snowflake accounts
  • Stream processing: Real-time ingestion via streams + tasks + external functions

Detailed Explanation

Spark Integration

Apache Spark integration with Snowflake enables large-scale data processing using Spark's distributed compute capabilities while leveraging Snowflake's managed data platform. The Snowflake Connector for Spark provides native read/write capabilities, predicate pushdown, and schema inference. This integration is ideal for complex transformations that benefit from Spark's in-memory processing or require Spark-specific libraries (MLlib, GraphX).

The connector supports pushdown predicates, where Spark filters are translated to Snowflake SQL and executed during data retrieval. This optimization reduces data transfer between Snowflake and Spark, improving performance for filtered queries. Partition pruning leverages Snowflake's micro-partition metadata to eliminate irrelevant data during reads.

Data loading uses Snowflake's COPY command for efficient bulk loading, with configurable parallelism and file formats. The connector automatically creates temporary staging tables and manages the loading process. For small datasets, direct INSERT operations may be more efficient than COPY loading.

Airflow Integration

Apache Airflow orchestrates Snowflake data pipelines through operators, hooks, and sensors. The SnowflakeOperator executes SQL statements with support for parameterized queries, transactions, and multi-statement execution. The operator handles connection management, error handling, and result processing.

Transfer operators enable data movement between Snowflake and cloud storage (S3, Azure Blob, GCS). The SnowflakeToS3Operator exports query results to cloud storage in various formats (Parquet, CSV, JSON), while S3ToSnowflakeOperator imports data from cloud storage using COPY commands. These operators are essential for building data lakes and integrating with external systems.

Hooks provide low-level connection management and SQL execution capabilities. The SnowflakeHook manages authentication, session handling, and cursor operations. Hooks can be used in custom operators, sensors, and Python functions for advanced pipeline requirements.

dbt Integration

dbt (data build tool) provides a SQL-based transformation framework that integrates natively with Snowflake. dbt models are SQL SELECT statements that are materialized as views or tables in Snowflake. The framework handles dependency management, incremental processing, testing, and documentation generation.

Staging models clean and standardize raw data, applying naming conventions and data type conversions. Intermediate models combine staging models into business-ready datasets. Mart models create final analytical tables optimized for reporting and dashboards. This layered approach improves maintainability and data quality.

dbt tests validate data quality by checking for nulls, uniqueness, referential integrity, and custom business rules. Tests are executed as SQL queries that return failing rows, enabling rapid identification of data quality issues. Test results are stored in Snowflake for reporting and alerting.

Integration Patterns

ELT Pattern: Extract data to Snowflake, load into raw tables, then transform using dbt or Spark. This pattern leverages Snowflake's compute for transformations while maintaining raw data for auditing.

Real-time Integration: Use Snowpipe for continuous data loading, combined with Streams and Tasks for real-time processing. Airflow or custom applications orchestrate the pipeline.

Hybrid Processing: Use Snowpark for transformations that can execute within Snowflake, and Spark for complex transformations requiring external libraries. This pattern optimizes compute usage while maintaining flexibility.

Key Concepts Table

IntegrationPrimary UseStrengthsLimitations
SparkLarge-scale processingML libraries, complex transformsRequires Spark cluster
AirflowOrchestrationRich operators, schedulingSteep learning curve
dbtSQL transformationsVersion control, testingSQL-only transformations
ComponentPurposeConfiguration
Spark ConnectorData read/writeSnowflake connection options
Airflow OperatorTask executionSQL, parameters, connections
dbt ModelData transformationSQL, Jinja templating
MetricSparkAirflowdbt
Setup ComplexityHighMediumLow
Learning CurveHighMediumLow
FlexibilityVery HighHighMedium
PerformanceVery HighN/AHigh

Code Examples

-- Example 1: Spark-Snowflake read
-- PySpark code
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("SnowflakeRead") \
    .config("spark.jars.packages", 
            "net.snowflake:spark-snowflake_2.12:2.13.0-spark_3.3") \
    .getOrCreate()

# Read from Snowflake
df = spark.read \
    .format("snowflake") \
    .option("url", "myaccount.snowflakecomputing.com") \
    .option("user", "myuser") \
    .option("password", "mypassword") \
    .option("db", "ANALYTICS_DB") \
    .option("schema", "PUBLIC") \
    .option("warehouse", "ANALYTICS_WH") \
    .option("dbtable", "sales_data") \
    .load()

# Transform and write back
transformed_df = df.filter("amount > 1000") \
    .groupBy("region") \
    .agg({"amount": "sum"})

transformed_df.write \
    .format("snowflake") \
    .option("dbtable", "sales_summary") \
    .mode("overwrite") \
    .save()
# Example 2: Airflow DAG for Snowflake pipeline
from airflow import DAG
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'data_team',
    'depends_on_past': False,
    'start_date': datetime(2024, 1, 1),
    'email_on_failure': True,
    'email': ['data-team@company.com'],
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

with DAG('snowflake_etl_pipeline',
         default_args=default_args,
         schedule_interval='@daily',
         catchup=False) as dag:

    extract_data = SnowflakeOperator(
        task_id='extract_raw_data',
        sql="""
            INSERT INTO raw_sales 
            SELECT * FROM staging_sales 
            WHERE load_date = '{{ ds }}'
        """,
        snowflake_conn_id='snowflake_default',
        warehouse='ETL_WH',
        database='RAW_DB',
        schema='PUBLIC'
    )

    validate_data = SnowflakeOperator(
        task_id='validate_data',
        sql="""
            SELECT COUNT(*) as row_count 
            FROM raw_sales 
            WHERE load_date = '{{ ds }}'
        """,
        snowflake_conn_id='snowflake_default'
    )

    transform_data = SnowflakeOperator(
        task_id='transform_data',
        sql="transform_query.sql",
        parameters={'execution_date': '{{ ds }}'},
        snowflake_conn_id='snowflake_default'
    )

    extract_data >> validate_data >> transform_data
-- Example 3: dbt model (staging/stg_orders.sql)
with source as (
    select * from {{ source('raw', 'orders') }}
),

renamed as (
    select
        order_id,
        customer_id,
        order_date,
        amount as order_amount,
        status as order_status,
        created_at,
        updated_at
    from source
)

select * from renamed

-- Example 4: dbt model (marts/fct_orders.sql)
with orders as (
    select * from {{ ref('stg_orders') }}
),

customers as (
    select * from {{ ref('dim_customers') }}
),

final as (
    select
        orders.order_id,
        orders.customer_id,
        customers.customer_name,
        customers.customer_segment,
        orders.order_date,
        orders.order_amount,
        orders.order_status,
        DATE_TRUNC('month', orders.order_date) as order_month
    from orders
    left join customers 
        on orders.customer_id = customers.customer_id
)

select * from final
-- Example 5: Airflow custom operator for Snowflake
from airflow.models import BaseOperator
from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook
from airflow.utils.decorators import apply_defaults

class SnowflakeDataQualityOperator(BaseOperator):
    
    @apply_defaults
    def __init__(
        self,
        sql,
        expected_result,
        snowflake_conn_id='snowflake_default',
        *args, **kwargs
    ):
        super().__init__(*args, **kwargs)
        self.sql = sql
        self.expected_result = expected_result
        self.snowflake_conn_id = snowflake_conn_id
    
    def execute(self, context):
        hook = SnowflakeHook(snowflake_conn_id=self.snowflake_conn_id)
        result = hook.get_first(self.sql)
        
        if result[0] != self.expected_result:
            raise ValueError(
                f"Data quality check failed: {result[0]} != {self.expected_result}"
            )
        
        return result[0]

# Usage in DAG
quality_check = SnowflakeDataQualityOperator(
    task_id='check_row_count',
    sql="SELECT COUNT(*) FROM fct_orders WHERE order_date = CURRENT_DATE()",
    expected_result=1000,
    snowflake_conn_id='snowflake_default'
)
-- Example 6: dbt test (tests/not_null_order_id.sql)
SELECT *
FROM {{ ref('fct_orders') }}
WHERE order_id IS NULL

-- Example 7: dbt source definition (models/sources.yml)
version: 2

sources:
  - name: raw
    database: RAW_DB
    schema: PUBLIC
    tables:
      - name: orders
        loaded_at_field: updated_at
        freshness:
          warn_after: {count: 12, period: hour}
          error_after: {count: 24, period: hour}
      - name: customers
        columns:
          - name: customer_id
            tests:
              - unique
              - not_null

Performance Metrics

MetricSparkAirflowdbt
Setup Time2-4 hours1-2 hours30 min
Learning Curve2-4 weeks1-2 weeks3-5 days
Pipeline SpeedVery FastModerateFast
MaintenanceHighMediumLow

Best Practices

  1. Choose the right tool: Use Spark for complex transformations requiring ML libraries, Airflow for orchestration, and dbt for SQL transformations.

  2. Use Snowpark when possible: Prefer Snowpark over Spark for transformations that can execute within Snowflake to minimize data movement.

  3. Implement incremental processing: Use dbt incremental models and Airflow backfill capabilities to process only new data.

  4. Test thoroughly: Implement data quality tests in dbt and validation checks in Airflow to catch issues early.

  5. Monitor pipelines: Track pipeline execution times, error rates, and resource usage for optimization.

  6. Version control: Use Git for all pipeline code, including dbt models, Airflow DAGs, and Spark applications.

  7. Document dependencies: Maintain clear documentation of data lineage, dependencies, and transformation logic.

  8. Optimize performance: Use appropriate partitioning, clustering, and warehouse sizing for each pipeline component.

  9. Implement error handling: Add retry logic, alerting, and fallback mechanisms for production pipelines.

  10. Regular reviews: Conduct weekly pipeline reviews to identify optimization opportunities and address issues proactively.


See Also

Advertisement

Need Expert Snowflake Help?

Get personalized warehouse optimization, data modeling, or Snowflake platform consulting.

Advertisement