CW

Snowflake ML Operations (MLOps)

Free Lesson

Advertisement

Snowflake ML Operations (MLOps)

Snowflake provides a comprehensive MLOps platform that enables end-to-end machine learning workflows directly within the data warehouse.

Feature Store

Creating Feature Tables

-- Create feature store schema
CREATE SCHEMA IF NOT EXISTS feature_store;

-- Create customer features table
CREATE OR REPLACE TABLE feature_store.customer_features AS
SELECT
  customer_id,
  -- Transaction features
  COUNT(*) as total_transactions,
  SUM(amount) as total_spend,
  AVG(amount) as avg_transaction_amount,
  MAX(amount) as max_transaction_amount,
  -- Temporal features
  DATEDIFF('day', MIN(order_date), CURRENT_DATE()) as customer_tenure_days,
  DATEDIFF('day', MAX(order_date), CURRENT_DATE()) as days_since_last_order,
  -- Behavioral features
  COUNT(DISTINCT DATE_TRUNC('month', order_date)) as active_months,
  COUNT(DISTINCT product_category) as unique_categories_purchased
FROM orders
GROUP BY customer_id;

-- Create feature versioning
CREATE OR REPLACE TABLE feature_store.customer_features_v2 AS
SELECT
  *,
  -- Add new features
  total_spend / NULLIF(customer_tenure_days, 0) as daily_spend_rate,
  total_transactions / NULLIF(active_months, 0) as transactions_per_month
FROM feature_store.customer_features;

Model Training

Using ML Functions

-- Create training dataset
CREATE OR REPLACE TABLE training_data AS
SELECT
  customer_id,
  total_spend,
  total_transactions,
  customer_tenure_days,
  days_since_last_order,
  -- Target variable
  CASE
    WHEN total_spend > 1000 THEN 1
    ELSE 0
  END as high_value_customer
FROM feature_store.customer_features;

-- Train classification model using ML Functions
CREATE OR REPLACE MODEL customer_classification
  INPUT_TABLE = training_data
  OUTPUT_TABLE = classification_model_output
  TARGET_COLUMN = 'high_value_customer'
  TRAINING_PARAMETERS = {
    'MAX_ITERATIONS': 100,
    'TEST_SPLIT': 0.2,
    'VALIDATION_SPLIT': 0.1
  };

-- Check model performance
SELECT
  model_name,
  training_accuracy,
  validation_accuracy,
  test_accuracy,
  f1_score,
  auc_roc
FROM classification_model_output;

Custom Model Training

-- Create stored procedure for custom training
CREATE OR REPLACE PROCEDURE train_custom_model(
  training_table STRING,
  model_name STRING
)
RETURNS VARIANT
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('snowflake-snowpark-python', 'scikit-learn', 'pandas')
HANDLER = 'main'
AS
$$
def main(session, training_table, model_name):
    import pandas as pd
    from sklearn.ensemble import RandomForestClassifier
    from sklearn.model_selection import train_test_split
    import pickle

    # Load training data
    df = session.table(training_table).to_pandas()

    # Prepare features and target
    X = df.drop('high_value_customer', axis=1)
    y = df['high_value_customer']

    # Split data
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)

    # Train model
    model = RandomForestClassifier(n_estimators=100, random_state=42)
    model.fit(X_train, y_train)

    # Evaluate
    train_score = model.score(X_train, y_train)
    test_score = model.score(X_test, y_test)

    # Save model
    model_blob = pickle.dumps(model)
    session.sql(f"PUT file://{model_name}.pkl @model_stage").collect()

    return {
        'model_name': model_name,
        'train_accuracy': train_score,
        'test_accuracy': test_score,
        'features': list(X.columns)
    }
$$;

Model Registry

Storing Models

-- Create model registry
CREATE OR REPLACE TABLE model_registry (
  model_id INTEGER PRIMARY KEY,
  model_name STRING,
  model_version STRING,
  model_binary BINARY,
  training_date TIMESTAMP_NTZ,
  performance_metrics VARIANT,
  feature_columns VARIANT,
  status STRING
);

-- Register trained model
INSERT INTO model_registry (
  model_name,
  model_version,
  model_binary,
  training_date,
  performance_metrics,
  feature_columns,
  status
)
SELECT
  'customer_classification',
  'v1.0',
  MODEL_BINARY,
  CURRENT_TIMESTAMP(),
  OBJECT_CONSTRUCT(
    'accuracy', accuracy,
    'f1_score', f1_score,
    'auc_roc', auc_roc
  ),
  FEATURE_COLUMNS,
  'ACTIVE'
FROM classification_model_output;

Model Versioning

-- Create model versions view
CREATE OR REPLACE VIEW model_versions AS
SELECT
  model_name,
  model_version,
  training_date,
  performance_metrics:accuracy::FLOAT as accuracy,
  performance_metrics:f1_score::FLOAT as f1_score,
  status,
  ROW_NUMBER() OVER (
    PARTITION BY model_name
    ORDER BY training_date DESC
  ) as version_rank
FROM model_registry;

-- Get latest model version
SELECT * FROM model_versions
WHERE version_rank = 1
  AND model_name = 'customer_classification';

Model Serving

Batch Inference

-- Create prediction table
CREATE OR REPLACE TABLE predictions AS
SELECT
  customer_id,
  -- Get latest model
  (SELECT model_binary FROM model_registry
   WHERE model_name = 'customer_classification'
   AND status = 'ACTIVE'
   ORDER BY training_date DESC LIMIT 1) as model,
  -- Features
  total_spend,
  total_transactions,
  customer_tenure_days,
  days_since_last_order
FROM feature_store.customer_features;

-- Score customers
CREATE OR REPLACE PROCEDURE score_customers()
RETURNS STRING
LANGUAGE SQL
AS
$$
BEGIN
  INSERT INTO customer_scores (
    customer_id,
    prediction_score,
    prediction_label,
    scored_at
  )
  SELECT
    customer_id,
    PREDICT_PROBABILITY(model, OBJECT_CONSTRUCT(
      'total_spend', total_spend,
      'total_transactions', total_transactions,
      'customer_tenure_days', customer_tenure_days,
      'days_since_last_order', days_since_last_order
    )) as score,
    CASE
      WHEN PREDICT_PROBABILITY(model, OBJECT_CONSTRUCT(
        'total_spend', total_spend,
        'total_transactions', total_transactions,
        'customer_tenure_days', customer_tenure_days,
        'days_since_last_order', days_since_last_order
      )) > 0.5 THEN 'HIGH_VALUE'
      ELSE 'STANDARD'
    END as label,
    CURRENT_TIMESTAMP()
  FROM predictions;

  RETURN 'SUCCESS: Customers scored';
END;
$$;

Real-Time Inference

-- Create inference endpoint
CREATE OR REPLACE FUNCTION predict_customer_value(
  total_spend FLOAT,
  total_transactions INTEGER,
  customer_tenure_days INTEGER,
  days_since_last_order INTEGER
)
RETURNS FLOAT
LANGUAGE SQL
AS
$$
  SELECT PREDICT_PROBABILITY(
    (SELECT model_binary FROM model_registry
     WHERE model_name = 'customer_classification'
     AND status = 'ACTIVE'
     ORDER BY training_date DESC LIMIT 1),
    OBJECT_CONSTRUCT(
      'total_spend', total_spend,
      'total_transactions', total_transactions,
      'customer_tenure_days', customer_tenure_days,
      'days_since_last_order', days_since_last_order
    )
  )
$$;

-- Use in queries
SELECT
  customer_id,
  predict_customer_value(
    total_spend,
    total_transactions,
    customer_tenure_days,
    days_since_last_order
  ) as value_score
FROM feature_store.customer_features;

Model Monitoring

Performance Tracking

-- Create model monitoring table
CREATE OR REPLACE TABLE model_monitoring (
  monitoring_id INTEGER PRIMARY KEY,
  model_name STRING,
  model_version STRING,
  prediction_date DATE,
  total_predictions INTEGER,
  correct_predictions INTEGER,
  accuracy FLOAT,
  avg_confidence FLOAT
);

-- Track daily performance
INSERT INTO model_monitoring (
  model_name,
  model_version,
  prediction_date,
  total_predictions,
  correct_predictions,
  accuracy,
  avg_confidence
)
SELECT
  'customer_classification',
  'v1.0',
  CURRENT_DATE(),
  COUNT(*),
  SUM(CASE WHEN predicted = actual THEN 1 ELSE 0 END),
  AVG(CASE WHEN predicted = actual THEN 1.0 ELSE 0.0 END),
  AVG(confidence)
FROM predictions p
JOIN actual_results a ON p.customer_id = a.customer_id
WHERE p.prediction_date = CURRENT_DATE();

-- Monitor model drift
SELECT
  prediction_date,
  accuracy,
  LAG(accuracy) OVER (ORDER BY prediction_date) as prev_accuracy,
  accuracy - LAG(accuracy) OVER (ORDER BY prediction_date) as accuracy_change
FROM model_monitoring
WHERE model_name = 'customer_classification'
ORDER BY prediction_date DESC
LIMIT 30;

Data Drift Detection

-- Create drift monitoring
CREATE OR REPLACE PROCEDURE detect_data_drift()
RETURNS STRING
LANGUAGE SQL
AS
$$
DECLARE
  drift_detected BOOLEAN DEFAULT FALSE;
BEGIN
  -- Compare feature distributions
  WITH current_stats AS (
    SELECT
      AVG(total_spend) as mean_spend,
      STDDEV(total_spend) as std_spend
    FROM feature_store.customer_features
    WHERE updated_at >= DATEADD('day', -7, CURRENT_DATE())
  ),
  historical_stats AS (
    SELECT
      AVG(total_spend) as mean_spend,
      STDDEV(total_spend) as std_spend
    FROM feature_store.customer_features
    WHERE updated_at < DATEADD('day', -7, CURRENT_DATE())
  )
  SELECT
    ABS(current_stats.mean_spend - historical_stats.mean_spend) /
    historical_stats.std_spend > 2 INTO drift_detected
  FROM current_stats, historical_stats;

  IF drift_detected THEN
    RETURN 'WARNING: Data drift detected in total_spend feature';
  ELSE
    RETURN 'SUCCESS: No significant data drift detected';
  END IF;
END;
$$;

Model monitoring should track both model performance (accuracy, F1) and data quality (feature distributions, missing values). Drift in either can indicate model degradation.

MLOps Best Practices

PracticeImplementationBenefit
Feature StoreCentralized feature managementConsistency between training and inference
Model RegistryVersion control for modelsReproducibility and governance
Automated TrainingScheduled retraining pipelinesModel freshness
A/B TestingCompare model versionsData-driven model selection
Drift DetectionMonitor data and model driftEarly degradation detection

Key Takeaways:

  • Feature store centralizes feature engineering and management
  • ML Functions enable no-code model training
  • Model registry provides versioning and governance
  • Multiple serving patterns (batch, real-time, functions)
  • Comprehensive monitoring tracks performance and drift
  • Automated pipelines ensure model freshness

Advertisement

Need Expert Snowflake Help?

Get personalized warehouse optimization, data modeling, or Snowflake platform consulting.

Advertisement