Snowflake ML Operations (MLOps)
Snowflake provides a comprehensive MLOps platform that enables end-to-end machine learning workflows directly within the data warehouse.
Feature Store
Creating Feature Tables
-- Create feature store schema
CREATE SCHEMA IF NOT EXISTS feature_store;
-- Create customer features table
CREATE OR REPLACE TABLE feature_store.customer_features AS
SELECT
customer_id,
-- Transaction features
COUNT(*) as total_transactions,
SUM(amount) as total_spend,
AVG(amount) as avg_transaction_amount,
MAX(amount) as max_transaction_amount,
-- Temporal features
DATEDIFF('day', MIN(order_date), CURRENT_DATE()) as customer_tenure_days,
DATEDIFF('day', MAX(order_date), CURRENT_DATE()) as days_since_last_order,
-- Behavioral features
COUNT(DISTINCT DATE_TRUNC('month', order_date)) as active_months,
COUNT(DISTINCT product_category) as unique_categories_purchased
FROM orders
GROUP BY customer_id;
-- Create feature versioning
CREATE OR REPLACE TABLE feature_store.customer_features_v2 AS
SELECT
*,
-- Add new features
total_spend / NULLIF(customer_tenure_days, 0) as daily_spend_rate,
total_transactions / NULLIF(active_months, 0) as transactions_per_month
FROM feature_store.customer_features;
Model Training
Using ML Functions
-- Create training dataset
CREATE OR REPLACE TABLE training_data AS
SELECT
customer_id,
total_spend,
total_transactions,
customer_tenure_days,
days_since_last_order,
-- Target variable
CASE
WHEN total_spend > 1000 THEN 1
ELSE 0
END as high_value_customer
FROM feature_store.customer_features;
-- Train classification model using ML Functions
CREATE OR REPLACE MODEL customer_classification
INPUT_TABLE = training_data
OUTPUT_TABLE = classification_model_output
TARGET_COLUMN = 'high_value_customer'
TRAINING_PARAMETERS = {
'MAX_ITERATIONS': 100,
'TEST_SPLIT': 0.2,
'VALIDATION_SPLIT': 0.1
};
-- Check model performance
SELECT
model_name,
training_accuracy,
validation_accuracy,
test_accuracy,
f1_score,
auc_roc
FROM classification_model_output;
Custom Model Training
-- Create stored procedure for custom training
CREATE OR REPLACE PROCEDURE train_custom_model(
training_table STRING,
model_name STRING
)
RETURNS VARIANT
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('snowflake-snowpark-python', 'scikit-learn', 'pandas')
HANDLER = 'main'
AS
$$
def main(session, training_table, model_name):
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
import pickle
# Load training data
df = session.table(training_table).to_pandas()
# Prepare features and target
X = df.drop('high_value_customer', axis=1)
y = df['high_value_customer']
# Split data
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
# Train model
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_train, y_train)
# Evaluate
train_score = model.score(X_train, y_train)
test_score = model.score(X_test, y_test)
# Save model
model_blob = pickle.dumps(model)
session.sql(f"PUT file://{model_name}.pkl @model_stage").collect()
return {
'model_name': model_name,
'train_accuracy': train_score,
'test_accuracy': test_score,
'features': list(X.columns)
}
$$;
Model Registry
Storing Models
-- Create model registry
CREATE OR REPLACE TABLE model_registry (
model_id INTEGER PRIMARY KEY,
model_name STRING,
model_version STRING,
model_binary BINARY,
training_date TIMESTAMP_NTZ,
performance_metrics VARIANT,
feature_columns VARIANT,
status STRING
);
-- Register trained model
INSERT INTO model_registry (
model_name,
model_version,
model_binary,
training_date,
performance_metrics,
feature_columns,
status
)
SELECT
'customer_classification',
'v1.0',
MODEL_BINARY,
CURRENT_TIMESTAMP(),
OBJECT_CONSTRUCT(
'accuracy', accuracy,
'f1_score', f1_score,
'auc_roc', auc_roc
),
FEATURE_COLUMNS,
'ACTIVE'
FROM classification_model_output;
Model Versioning
-- Create model versions view
CREATE OR REPLACE VIEW model_versions AS
SELECT
model_name,
model_version,
training_date,
performance_metrics:accuracy::FLOAT as accuracy,
performance_metrics:f1_score::FLOAT as f1_score,
status,
ROW_NUMBER() OVER (
PARTITION BY model_name
ORDER BY training_date DESC
) as version_rank
FROM model_registry;
-- Get latest model version
SELECT * FROM model_versions
WHERE version_rank = 1
AND model_name = 'customer_classification';
Model Serving
Batch Inference
-- Create prediction table
CREATE OR REPLACE TABLE predictions AS
SELECT
customer_id,
-- Get latest model
(SELECT model_binary FROM model_registry
WHERE model_name = 'customer_classification'
AND status = 'ACTIVE'
ORDER BY training_date DESC LIMIT 1) as model,
-- Features
total_spend,
total_transactions,
customer_tenure_days,
days_since_last_order
FROM feature_store.customer_features;
-- Score customers
CREATE OR REPLACE PROCEDURE score_customers()
RETURNS STRING
LANGUAGE SQL
AS
$$
BEGIN
INSERT INTO customer_scores (
customer_id,
prediction_score,
prediction_label,
scored_at
)
SELECT
customer_id,
PREDICT_PROBABILITY(model, OBJECT_CONSTRUCT(
'total_spend', total_spend,
'total_transactions', total_transactions,
'customer_tenure_days', customer_tenure_days,
'days_since_last_order', days_since_last_order
)) as score,
CASE
WHEN PREDICT_PROBABILITY(model, OBJECT_CONSTRUCT(
'total_spend', total_spend,
'total_transactions', total_transactions,
'customer_tenure_days', customer_tenure_days,
'days_since_last_order', days_since_last_order
)) > 0.5 THEN 'HIGH_VALUE'
ELSE 'STANDARD'
END as label,
CURRENT_TIMESTAMP()
FROM predictions;
RETURN 'SUCCESS: Customers scored';
END;
$$;
Real-Time Inference
-- Create inference endpoint
CREATE OR REPLACE FUNCTION predict_customer_value(
total_spend FLOAT,
total_transactions INTEGER,
customer_tenure_days INTEGER,
days_since_last_order INTEGER
)
RETURNS FLOAT
LANGUAGE SQL
AS
$$
SELECT PREDICT_PROBABILITY(
(SELECT model_binary FROM model_registry
WHERE model_name = 'customer_classification'
AND status = 'ACTIVE'
ORDER BY training_date DESC LIMIT 1),
OBJECT_CONSTRUCT(
'total_spend', total_spend,
'total_transactions', total_transactions,
'customer_tenure_days', customer_tenure_days,
'days_since_last_order', days_since_last_order
)
)
$$;
-- Use in queries
SELECT
customer_id,
predict_customer_value(
total_spend,
total_transactions,
customer_tenure_days,
days_since_last_order
) as value_score
FROM feature_store.customer_features;
Model Monitoring
Performance Tracking
-- Create model monitoring table
CREATE OR REPLACE TABLE model_monitoring (
monitoring_id INTEGER PRIMARY KEY,
model_name STRING,
model_version STRING,
prediction_date DATE,
total_predictions INTEGER,
correct_predictions INTEGER,
accuracy FLOAT,
avg_confidence FLOAT
);
-- Track daily performance
INSERT INTO model_monitoring (
model_name,
model_version,
prediction_date,
total_predictions,
correct_predictions,
accuracy,
avg_confidence
)
SELECT
'customer_classification',
'v1.0',
CURRENT_DATE(),
COUNT(*),
SUM(CASE WHEN predicted = actual THEN 1 ELSE 0 END),
AVG(CASE WHEN predicted = actual THEN 1.0 ELSE 0.0 END),
AVG(confidence)
FROM predictions p
JOIN actual_results a ON p.customer_id = a.customer_id
WHERE p.prediction_date = CURRENT_DATE();
-- Monitor model drift
SELECT
prediction_date,
accuracy,
LAG(accuracy) OVER (ORDER BY prediction_date) as prev_accuracy,
accuracy - LAG(accuracy) OVER (ORDER BY prediction_date) as accuracy_change
FROM model_monitoring
WHERE model_name = 'customer_classification'
ORDER BY prediction_date DESC
LIMIT 30;
Data Drift Detection
-- Create drift monitoring
CREATE OR REPLACE PROCEDURE detect_data_drift()
RETURNS STRING
LANGUAGE SQL
AS
$$
DECLARE
drift_detected BOOLEAN DEFAULT FALSE;
BEGIN
-- Compare feature distributions
WITH current_stats AS (
SELECT
AVG(total_spend) as mean_spend,
STDDEV(total_spend) as std_spend
FROM feature_store.customer_features
WHERE updated_at >= DATEADD('day', -7, CURRENT_DATE())
),
historical_stats AS (
SELECT
AVG(total_spend) as mean_spend,
STDDEV(total_spend) as std_spend
FROM feature_store.customer_features
WHERE updated_at < DATEADD('day', -7, CURRENT_DATE())
)
SELECT
ABS(current_stats.mean_spend - historical_stats.mean_spend) /
historical_stats.std_spend > 2 INTO drift_detected
FROM current_stats, historical_stats;
IF drift_detected THEN
RETURN 'WARNING: Data drift detected in total_spend feature';
ELSE
RETURN 'SUCCESS: No significant data drift detected';
END IF;
END;
$$;
Model monitoring should track both model performance (accuracy, F1) and data quality (feature distributions, missing values). Drift in either can indicate model degradation.
MLOps Best Practices
| Practice | Implementation | Benefit |
|---|---|---|
| Feature Store | Centralized feature management | Consistency between training and inference |
| Model Registry | Version control for models | Reproducibility and governance |
| Automated Training | Scheduled retraining pipelines | Model freshness |
| A/B Testing | Compare model versions | Data-driven model selection |
| Drift Detection | Monitor data and model drift | Early degradation detection |
Key Takeaways:
- Feature store centralizes feature engineering and management
- ML Functions enable no-code model training
- Model registry provides versioning and governance
- Multiple serving patterns (batch, real-time, functions)
- Comprehensive monitoring tracks performance and drift
- Automated pipelines ensure model freshness