Snowpark Python: UDFs, Stored Procedures & DataFrames
Architecture Diagram 1: Snowpark Architecture
āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā
ā SNOWPARK PYTHON ARCHITECTURE ā
āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā¤
ā ā
ā CLIENT APPLICATION ā
ā āāāāāāāāāāāāāāāāāā ā
ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā
ā ā ā ā
ā ā Python Code (Local Development): ā ā
ā ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā ā
ā ā ā from snowflake.snowpark import Session ā ā ā
ā ā ā from snowflake.snowpark.functions import col, sum as sum_ ā ā ā
ā ā ā ā ā ā
ā ā ā # Create session ā ā ā
ā ā ā session = Session.builder.configs({ ā ā ā
ā ā ā "account": "your_account", ā ā ā
ā ā ā "user": "your_user", ā ā ā
ā ā ā "password": "your_password" ā ā ā
ā ā ā }).create() ā ā ā
ā ā ā ā ā ā
ā ā ā # DataFrame operations ā ā ā
ā ā ā df = session.table("sales_data") ā ā ā
ā ā ā result = df.filter(col("amount") > 1000) ā ā ā
ā ā ā .group_by("region") ā ā ā
ā ā ā .agg(sum_(col("amount")).alias("total")) ā ā ā
ā ā ā .collect() ā ā ā
ā ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā ā
ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā
ā ā ā
ā ā Snowpark Client Library ā
ā ā (Translates Python to SQL) ā
ā ā¼ ā
ā SNOWPARK CLIENT LIBRARY ā
ā āāāāāāāāāāāāāāāāāāāāāāāāā ā
ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā
ā ā ā ā
ā ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā ā
ā ā ā Translation Layer: ā ā ā
ā ā ā ā ā ā
ā ā ā Python DataFrame Operations āāā¶ SQL Execution Plans ā ā ā
ā ā ā ā ā ā
ā ā ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā ā ā
ā ā ā ā df.filter(col("amount") > 1000) ā ā ā ā
ā ā ā ā ā¼ ā ā ā ā
ā ā ā ā WHERE amount > 1000 ā ā ā ā
ā ā ā ā ā ā ā ā
ā ā ā ā df.group_by("region") ā ā ā ā
ā ā ā ā ā¼ ā ā ā ā
ā ā ā ā GROUP BY region ā ā ā ā
ā ā ā ā ā ā ā ā
ā ā ā ā .agg(sum_(col("amount"))) ā ā ā ā
ā ā ā ā ā¼ ā ā ā ā
ā ā ā ā SUM(amount) ā ā ā ā
ā ā ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā ā ā
ā ā ā ā ā ā
ā ā ā Query Optimization: ā ā ā
ā ā ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā ā ā
ā ā ā ā ⢠Predicate pushdown ā ā ā ā
ā ā ā ā ⢠Column pruning ā ā ā ā
ā ā ā ā ⢠Join optimization ā ā ā ā
ā ā ā ā ⢠Partition pruning ā ā ā ā
ā ā ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā ā ā
ā ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā ā
ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā
ā ā ā
ā ā Optimized SQL ā
ā ā¼ ā
ā SNOWFLAKE COMPUTE (Virtual Warehouse) ā
ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā
ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā
ā ā ā ā
ā ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā ā
ā ā ā SQL Execution Engine: ā ā ā
ā ā ā ā ā ā
ā ā ā SELECT region, SUM(amount) as total ā ā ā
ā ā ā FROM sales_data ā ā ā
ā ā ā WHERE amount > 1000 ā ā ā
ā ā ā GROUP BY region; ā ā ā
ā ā ā ā ā ā
ā ā ā Execution Plan: ā ā ā
ā ā ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā ā ā
ā ā ā ā 1. Table Scan (sales_data) ā ā ā ā
ā ā ā ā 2. Filter (amount > 1000) ā ā ā ā
ā ā ā ā 3. Hash Aggregate (GROUP BY region) ā ā ā ā
ā ā ā ā 4. Sort (optional) ā ā ā ā
ā ā ā ā 5. Return results ā ā ā ā
ā ā ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā ā ā
ā ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā ā
ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā
ā ā ā
ā ā Results ā
ā ā¼ ā
ā RETURN TO CLIENT ā
ā āāāāāāāāāāāāāāāāāā ā
ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā
ā ā ā ā
ā ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā ā
ā ā ā Result: ā ā ā
ā ā ā āāāāāāāāāāā¬āāāāāāāāāāā ā ā ā
ā ā ā ā Region ā Total ā ā ā ā
ā ā ā āāāāāāāāāāā¼āāāāāāāāāā⤠ā ā ā
ā ā ā ā US ā 1,500,000ā ā ā ā
ā ā ā ā EU ā 800,000 ā ā ā ā
ā ā ā ā APAC ā 600,000 ā ā ā ā
ā ā ā āāāāāāāāāāā“āāāāāāāāāāā ā ā ā
ā ā ā ā ā ā
ā ā ā Return Format: ā ā ā
ā ā ā ⢠collect() ā List of Row objects ā ā ā
ā ā ā ⢠to_pandas() ā Pandas DataFrame ā ā ā
ā ā ā ⢠show() ā Formatted output ā ā ā
ā ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā ā
ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā
ā ā
āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā
Architecture Diagram 2: UDF Architecture
āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā
ā SNOWPARK UDF ARCHITECTURE ā
āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā¤
ā ā
ā UDF DEFINITION ā
ā āāāāāāāāāāāāāā ā
ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā
ā ā ā ā
ā ā @udf(name="calculate_discount", ā ā
ā ā return_type=FloatType(), ā ā
ā ā input_types=[FloatType(), FloatType()], ā ā
ā ā packages=["pandas"], ā ā
ā ā imports=["utils/discount_logic.py"]) ā ā
ā ā def calculate_discount(price: float, discount_pct: float) -> float:ā ā
ā ā return price * (discount_pct / 100) ā ā
ā ā ā ā
ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā
ā ā ā
ā ā Register UDF ā
ā ā¼ ā
ā UDF REGISTRATION ā
ā āāāāāāāāāāāāāāāāā ā
ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā
ā ā ā ā
ā ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā ā
ā ā ā UDF Metadata: ā ā ā
ā ā ā ⢠Name: calculate_discount ā ā ā
ā ā ā ⢠Language: Python ā ā ā
ā ā ā ⢠Return Type: FLOAT ā ā ā
ā ā ā ⢠Input Types: [FLOAT, FLOAT] ā ā ā
ā ā ā ⢠Handler: calculate_discount ā ā ā
ā ā ā ⢠Packages: pandas ā ā ā
ā ā ā ⢠Imports: utils/discount_logic.py ā ā ā
ā ā ā ā ā ā
ā ā ā Runtime Environment: ā ā ā
ā ā ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā ā ā
ā ā ā ā ⢠Isolated Python runtime per warehouse node ā ā ā ā
ā ā ā ā ⢠Pre-packaged with common libraries ā ā ā ā
ā ā ā ā ⢠Cached for subsequent calls ā ā ā ā
ā ā ā ā ⢠Supports custom packages via imports ā ā ā ā
ā ā ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā ā ā
ā ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā ā
ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā
ā ā ā
ā ā Call UDF in SQL ā
ā ā¼ ā
ā UDF EXECUTION ā
ā āāāāāāāāāāāāāāā ā
ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā
ā ā ā ā
ā ā SQL Query: ā ā
ā ā SELECT ā ā
ā ā product, ā ā
ā ā price, ā ā
ā ā discount_pct, ā ā
ā ā calculate_discount(price, discount_pct) as discount_amount ā ā
ā ā FROM products; ā ā
ā ā ā ā
ā ā Execution Flow: ā ā
ā ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā ā
ā ā ā ā ā ā
ā ā ā 1. Query Parsing ā ā ā
ā ā ā āāā Identify UDF call: calculate_discount ā ā ā
ā ā ā ā ā ā
ā ā ā 2. UDF Resolution ā ā ā
ā ā ā āāā Lookup UDF metadata and handler ā ā ā
ā ā ā ā ā ā
ā ā ā 3. Runtime Initialization (if needed) ā ā ā
ā ā ā āāā Load Python runtime and dependencies ā ā ā
ā ā ā ā ā ā
ā ā ā 4. Batch Processing ā ā ā
ā ā ā āāā Process rows in batches (vectorized execution) ā ā ā
ā ā ā ā ā ā
ā ā ā 5. Return Results ā ā ā
ā ā ā āāā Aggregate results and return to query ā ā ā
ā ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā ā
ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā
ā ā ā
ā ā¼ ā
ā UDF TYPES ā
ā āāāāāāāāāā ā
ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā
ā ā ā ā
ā ā 1. SCALAR UDF ā ā
ā ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā ā
ā ā ā ⢠Processes one row at a time ā ā ā
ā ā ā ⢠Returns single value per row ā ā ā
ā ā ā ⢠Can be used in SELECT, WHERE, GROUP BY ā ā ā
ā ā ā ⢠Example: calculate_discount(price, pct) ā ā ā
ā ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā ā
ā ā ā ā
ā ā 2. TABLE UDF (UDTF) ā ā
ā ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā ā
ā ā ā ⢠Processes one row at a time ā ā ā
ā ā ā ⢠Returns multiple rows per input row ā ā ā
ā ā ā ⢠Used with LATERAL JOIN ā ā ā
ā ā ā ⢠Example: parse_json_to_rows(json_string) ā ā ā
ā ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā ā
ā ā ā ā
ā ā 3. AGGREGATE UDF ā ā
ā ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā ā
ā ā ā ⢠Processes multiple rows at a time ā ā ā
ā ā ā ⢠Returns single aggregated value ā ā ā
ā ā ā ⢠Used with GROUP BY ā ā ā
ā ā ā ⢠Example: custom_aggregate(values) ā ā ā
ā ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā ā
ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā
ā ā
āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā
Architecture Diagram 3: Stored Procedures Architecture
āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā
ā SNOWPARK STORED PROCEDURES ARCHITECTURE ā
āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā¤
ā ā
ā PROCEDURE DEFINITION ā
ā āāāāāāāāāāāāāāāāāāā ā
ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā
ā ā ā ā
ā ā @sproc(name="process_data", ā ā
ā ā return_type=StringType(), ā ā
ā ā input_types=[StringType()], ā ā
ā ā packages=["pandas", "numpy"], ā ā
ā ā imports=["utils/processing.py"]) ā ā
ā ā def process_data(session: Session, table_name: str) -> str: ā ā
ā ā # Access data using Snowpark ā ā
ā ā df = session.table(table_name) ā ā
ā ā ā ā
ā ā # Transform data ā ā
ā ā result = df.filter(col("status") == "active") \ ā ā
ā ā .with_column("processed", lit(True)) \ ā ā
ā ā .collect() ā ā
ā ā ā ā
ā ā # Write results ā ā
ā ā session.create_dataframe(result) \ ā ā
ā ā .write.mode("overwrite") \ ā ā
ā ā .save_as_table("processed_data") ā ā
ā ā ā ā
ā ā return f"Processed {len(result)} rows" ā ā
ā ā ā ā
ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā
ā ā ā
ā ā Register & Execute ā
ā ā¼ ā
ā PROCEDURE EXECUTION ā
ā āāāāāāāāāāāāāāāāāāā ā
ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā
ā ā ā ā
ā ā CALL process_data('raw_data'); ā ā
ā ā ā ā
ā ā Execution Flow: ā ā
ā ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā ā
ā ā ā ā ā ā
ā ā ā 1. Procedure Call Received ā ā ā
ā ā ā āāā Parse call and validate parameters ā ā ā
ā ā ā ā ā ā
ā ā ā 2. Session Creation ā ā ā
ā ā ā āāā Create Snowpark session with procedure context ā ā ā
ā ā ā ā ā ā
ā ā ā 3. Python Runtime Initialization ā ā ā
ā ā ā āāā Load Python interpreter and dependencies ā ā ā
ā ā ā ā ā ā
ā ā ā 4. Procedure Execution ā ā ā
ā ā ā āāā Execute procedure body with session context ā ā ā
ā ā ā ā ā ā
ā ā ā 5. Return Result ā ā ā
ā ā ā āāā Return scalar value or result set ā ā ā
ā ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā ā
ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā
ā ā ā
ā ā¼ ā
ā SESSION CONTEXT ā
ā āāāāāāāāāāāāāāāāā ā
ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā
ā ā ā ā
ā ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā ā
ā ā ā Session Object Capabilities: ā ā ā
ā ā ā ā ā ā
ā ā ā Data Access: ā ā ā
ā ā ā ⢠session.table("table_name") ā ā ā
ā ā ā ⢠session.sql("SELECT ...") ā ā ā
ā ā ā ⢠session.read.option("format").load("path") ā ā ā
ā ā ā ā ā ā
ā ā ā Data Writing: ā ā ā
ā ā ā ⢠df.write.mode("overwrite").save_as_table("table") ā ā ā
ā ā ā ⢠df.write.mode("append").save_as_table("table") ā ā ā
ā ā ā ⢠df.write.copy_into("stage") ā ā ā
ā ā ā ā ā ā
ā ā ā Session Management: ā ā ā
ā ā ā ⢠session.get_current_database() ā ā ā
ā ā ā ⢠session.get_current_schema() ā ā ā
ā ā ā ⢠session.use_database("db") ā ā ā
ā ā ā ⢠session.use_schema("schema") ā ā ā
ā ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā ā
ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā
ā ā ā
ā ā¼ ā
ā RETURN VALUE HANDLING ā
ā āāāāāāāāāāāāāāāāāāāāā ā
ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā
ā ā ā ā
ā ā Return Types: ā ā
ā ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā ā
ā ā ā ā ā ā
ā ā ā 1. SCALAR VALUE ā ā ā
ā ā ā ⢠return_type=StringType() ā ā ā
ā ā ā ⢠return_type=IntegerType() ā ā ā
ā ā ā ⢠return_type=FloatType() ā ā ā
ā ā ā ⢠Example: return "Processed 1000 rows" ā ā ā
ā ā ā ā ā ā
ā ā ā 2. RESULT SET ā ā ā
ā ā ā ⢠Return DataFrame as result ā ā ā
ā ā ā ⢠return_type=PandasDataFrameType() ā ā ā
ā ā ā ⢠Example: return df.to_pandas() ā ā ā
ā ā ā ā ā ā
ā ā ā 3. NULL VALUE ā ā ā
ā ā ā ⢠return None ā ā ā
ā ā ā ⢠Example: return None (no output) ā ā ā
ā ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā ā
ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā
ā ā
āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā
A Snowpark DataFrame is a lazy evaluation abstraction that represents a computation plan in Snowpark. Operations (select, filter, join, groupBy) build a computation graph without executing. Execution occurs only when an action is invoked (collect, count, write), and the optimized plan is translated into SQL executed on Snowflake warehouses.
A Snowpark Session is the entry point for all Snowpark operations. It manages connection to Snowflake, handles authentication, and provides methods to create DataFrames, register UDFs, and manage transactions. Each session represents a single authenticated connection.
Use Snowpark for: ETL pipelines (DataFrame transformations), ML feature engineering, data quality validation, and complex data processing. Prefer stored procedures over UDFs for batch operations. Use cache() for frequently accessed DataFrames.
- Lazy evaluation: Operations build plans; execution only on actions
- SQL translation: DataFrame operations compile to optimized SQL
- Multi-language: Python, Java, Scala support with uniform DataFrame API
- ML integration: scikit-learn, XGBoost, PyTorch via Snowpark ML
- Cost model: Same as regular queries ā warehouse credits Ć execution time
Detailed Explanation
Snowpark Fundamentals
Snowpark is Snowflake's developer framework for writing data-intensive applications in Python, Java, and Scala. It enables developers to write code that executes entirely within Snowflake, leveraging the platform's compute resources while using familiar programming languages and paradigms. Snowpark eliminates the need to extract data for processing in external systems, reducing data movement, security risks, and complexity.
The core abstraction in Snowpark is the DataFrame, which represents a distributed dataset. DataFrame operations (filter, join, groupBy, agg) are translated into optimized SQL execution plans that run within Snowflake's compute layer. This translation happens automatically, allowing developers to write Pythonic code while benefiting from Snowflake's query optimization and parallel execution capabilities.
UDF Implementation Patterns
User-defined functions (UDFs) in Snowpark enable custom data processing logic that extends Snowflake's built-in function library. Scalar UDFs process individual rows and return single values, while table-valued functions (UDTFs) can return multiple rows per input. UDFs are registered with Snowflake and can be used in SQL queries like built-in functions.
UDF performance depends on several factors: the complexity of the function logic, the efficiency of the Python runtime, and the volume of data processed. For optimal performance, keep UDF logic simple, avoid external dependencies when possible, and consider batch processing for large datasets. Snowpark automatically optimizes UDF execution by processing rows in batches, reducing per-row overhead.
Stored Procedures for Complex Workflows
Snowpark stored procedures enable complex data processing workflows that combine SQL operations, DataFrame transformations, and procedural logic. Unlike UDFs, stored procedures can access the Snowpark Session object, enabling dynamic SQL execution, data reading and writing, and multi-step transformations.
Stored procedures are particularly useful for ETL processes, data quality checks, and complex business logic implementation. They can read from multiple tables, apply transformations using DataFrame operations, and write results to target tables. The Session object provides transaction management, error handling, and resource monitoring capabilities.
Session Management and Configuration
Snowpark sessions manage the connection between Python code and Snowflake compute resources. Sessions can be configured with account credentials, warehouse assignments, and database/schema contexts. The session object provides methods for executing SQL, reading and writing data, and managing the execution environment.
Session configuration affects performance and cost. Choosing the right warehouse size, setting appropriate timeouts, and managing session lifecycle are important considerations. For production workloads, consider using dedicated service accounts, implementing connection pooling, and monitoring session usage.
Best Practices for Snowpark Development
Effective Snowpark development requires understanding the translation between Python code and SQL execution. Optimize DataFrame operations by minimizing data shuffling, using appropriate partitioning, and leveraging predicate pushdown. Test code with representative data volumes to identify performance bottlenecks.
For UDFs, keep function logic simple and avoid expensive operations. Consider using vectorized operations for batch processing and caching frequently accessed data. For stored procedures, implement proper error handling, use transaction management for data consistency, and monitor execution metrics.
Key Concepts Table
| Component | Purpose | Execution | Use Case |
|---|---|---|---|
| DataFrame | Distributed dataset | SQL translation | Data manipulation |
| UDF | Custom function | Row/batch processing | Data transformation |
| UDTF | Table function | Row processing | Data expansion |
| Stored Procedure | Complex workflow | Full Python runtime | ETL, business logic |
| Return Type | Usage | Example |
|---|---|---|
| StringType | Text results | "Processed 100 rows" |
| IntegerType | Numeric results | 42 |
| FloatType | Decimal results | 3.14 |
| BooleanType | True/false | True |
| PandasDataFrame | Result sets | df.to_pandas() |
| Session Method | Purpose | Example |
|---|---|---|
| table() | Read table | session.table("sales") |
| sql() | Execute SQL | session.sql("SELECT 1") |
| write | Write data | df.write.save_as_table() |
| use_database() | Set context | session.use_database("db") |
Code Examples
# Example 1: Basic DataFrame operations
from snowflake.snowpark import Session
from snowflake.snowpark.functions import col, sum as sum_, avg
# Create session
session = Session.builder.configs({
"account": "your_account",
"user": "your_user",
"password": "your_password",
"role": "ANALYTICS_ROLE",
"warehouse": "ANALYTICS_WH",
"database": "ANALYTICS_DB",
"schema": "PUBLIC"
}).create()
# Read table
df = session.table("sales_data")
# Filter and aggregate
result = df.filter(col("amount") > 1000) \
.group_by("region") \
.agg(
sum_(col("amount")).alias("total_amount"),
avg(col("amount")).alias("avg_amount")
) \
.collect()
# Show results
for row in result:
print(f"Region: {row['REGION']}, Total: {row['TOTAL_AMOUNT']}")
# Example 2: Scalar UDF
from snowflake.snowpark.types import FloatType
from snowflake.snowpark.udf import udf
@udf(name="calculate_tax",
return_type=FloatType(),
input_types=[FloatType(), FloatType()],
packages=["pandas"])
def calculate_tax(amount: float, tax_rate: float) -> float:
return amount * (tax_rate / 100)
# Use UDF in query
session.udf.register(calculate_tax)
result = session.sql("""
SELECT product, amount,
calculate_tax(amount, 0.08) as tax
FROM sales
""").collect()
# Example 3: Table UDF
from snowflake.snowpark.types import StructType, StructField, StringType, IntegerType
from snowflake.snowpark.udtf import udtf
@udtf(name="parse_json_array",
output_schema=StructType([
StructField("key", StringType()),
StructField("value", StringType())
]),
input_types=[StringType()])
def parse_json_array(json_str: str):
import json
data = json.loads(json_str)
for key, value in data.items():
yield (key, str(value))
# Use UDTF in query
session.udtf.register(parse_json_array)
result = session.sql("""
SELECT *
FROM TABLE(parse_json_array('{"a": 1, "b": 2}'))
""").collect()
# Example 4: Stored procedure
from snowflake.snowpark import Session
def process_sales_data(session: Session, region: str) -> str:
# Read data
df = session.table("raw_sales")
# Transform
filtered = df.filter(col("region") == region.upper())
aggregated = filtered.group_by("product") \
.agg(sum_(col("amount")).alias("total"))
# Write results
aggregated.write.mode("overwrite") \
.save_as_table(f"sales_{region.lower()}")
return f"Processed {filtered.count()} rows for {region}"
# Register procedure
session.sproc.register(
process_sales_data,
return_type=StringType(),
input_types=[StringType()],
packages=["pandas"]
)
# Call procedure
result = session.call("process_sales_data", "US")
print(result)
# Example 5: Advanced DataFrame operations
from snowflake.snowpark.functions import when, lit, col
# Complex transformations
df = session.table("customers") \
.with_column("age_group",
when(col("age") < 25, lit("Young"))
.when(col("age") < 50, lit("Adult"))
.otherwise(lit("Senior"))
) \
.with_column("full_name",
col("first_name") + " " + col("last_name")
) \
.select("customer_id", "full_name", "age_group", "email")
# Write to table
df.write.mode("overwrite").save_as_table("customer_segments")
# Example 6: Error handling and logging
from snowflake.snowpark import Session
import logging
def robust_etl(session: Session) -> str:
try:
# Read source
source_df = session.table("source_table")
# Validate data quality
null_count = source_df.filter(col("id").isNull()).count()
if null_count > 0:
logging.warning(f"Found {null_count} null IDs")
# Transform
clean_df = source_df.filter(col("id").is_not_null()) \
.drop_duplicates(["id"])
# Write with error handling
clean_df.write.mode("overwrite").save_as_table("target_table")
return f"Success: Processed {clean_df.count()} rows"
except Exception as e:
logging.error(f"ETL failed: {str(e)}")
raise
Performance Metrics
| Metric | Target | Warning | Critical |
|---|---|---|---|
| DataFrame Translation Time | < 1s | 1-5s | > 5s |
| UDF Execution Time | < 100ms | 100-500ms | > 500ms |
| Stored Procedure Time | < 30s | 30-120s | > 120s |
| Data Transfer Latency | < 1s | 1-10s | > 10s |
Best Practices
-
Optimize DataFrame operations: Use predicate pushdown, column pruning, and partitioning to minimize data movement.
-
Keep UDFs simple: Avoid expensive operations in UDFs. Use batch processing for large datasets.
-
Manage sessions properly: Use appropriate warehouse sizes and implement connection pooling for production workloads.
-
Handle errors gracefully: Implement try-except blocks and logging in stored procedures.
-
Test with representative data: Validate performance and correctness with data volumes similar to production.
-
Use appropriate return types: Choose the most efficient return type for your use case.
-
Leverage built-in functions: Use Snowpark's built-in functions instead of custom UDFs when possible.
-
Monitor execution metrics: Track DataFrame translation times, UDF execution times, and data transfer volumes.
-
Implement transaction management: Use transactions in stored procedures for data consistency.
-
Document code thoroughly: Include comments explaining complex logic and data transformations.
See Also
- PySpark Iceberg - PySpark DataFrame patterns
- Delta Lake on Databricks - Delta Lake DataFrame API
- Data Warehouse Concepts - Data warehouse design principles