CW

dbt + AI/ML Integration

Free Lesson

Advertisement

dbt + AI/ML Integration

AI/ML Integration Architecture

ML Pipeline Flow

Formal Definitions

DfFeature Engineering

Feature engineering is the process of using domain knowledge to create input variables (features) from raw data that improve machine learning model performance. In dbt, feature engineering is implemented through SQL models that transform raw data into model-ready features: aggregations, encodings, time-based features, and interaction features.

DfFeature Store

A feature store is a centralized repository for storing, serving, and managing machine learning features. It provides consistent feature computation across training and serving, feature versioning, and point-in-time correctness. In dbt, feature stores are materialized tables/views containing precomputed features with temporal validity.

DfPoint-in-Time Correctness

Point-in-time correctness ensures that when training a model, features are joined using only data available at the prediction time. This prevents data leakage where future information inadvertently influences training. dbt's temporal join patterns and SCD Type 2 tables enable point-in-time correct feature construction.

DfModel Registry

A model registry is a centralized repository for tracking machine learning models, their versions, metadata, and performance metrics. It enables model versioning, A/B testing, and rollback capabilities. dbt can generate model metadata tables that serve as lightweight registries.

Detailed Explanation

dbt transforms raw data into clean, tested, documented datasetsβ€”exactly what ML pipelines need. By integrating dbt with ML workflows, teams can build reproducible feature engineering pipelines with built-in quality checks.

Why dbt for ML

  1. Reproducibility - dbt models are version-controlled and deterministic
  2. Data Quality - Built-in tests ensure feature correctness
  3. Documentation - Self-documenting features for ML teams
  4. Lineage - Track which raw data feeds into each feature
  5. Scheduling - Automated feature refresh with dbt Cloud jobs

ML Integration Patterns

PatternDescriptionUse Case
Feature StoreMaterialized feature tablesReal-time serving
Training SetsPoint-in-time correct datasetsModel training
Prediction LoggingStore model predictionsMonitoring
Drift DetectionTrack feature distributionsModel health

dbt is not a replacement for dedicated ML platforms (MLflow, Kubeflow, SageMaker). Instead, it complements them by handling the data preparation layer. Use dbt for feature engineering and data quality, and integrate with ML platforms for model training and serving.

Use dbt's incremental materialization for feature stores to efficiently update features as new data arrives. Combine with SCD Type 2 tables for temporal feature validity. This enables point-in-time correct feature joins without recomputing the entire feature table.

Code Examples

Feature Engineering Model

-- models/features/user_features.sql
{{
    config(
        materialized='table',
        tags=['features', 'ml']
    )
}}

with user_events as (
    select * from {{ ref('fct_events') }}
),

user_sessions as (
    select * from {{ ref('fct_user_sessions') }}
),

user_purchases as (
    select * from {{ ref('fct_purchases') }}
),

user_features as (
    select
        u.user_id,
        
        -- Engagement features
        count(distinct e.event_id) as total_events,
        count(distinct e.session_id) as total_sessions,
        avg(s.session_duration_seconds) as avg_session_duration,
        max(e.event_date) as last_active_date,
        datediff(day, max(e.event_date), current_date()) as days_since_last_active,
        
        -- Purchase features
        count(distinct p.purchase_id) as total_purchases,
        sum(p.purchase_amount) as total_revenue,
        avg(p.purchase_amount) as avg_order_value,
        max(p.purchase_date) as last_purchase_date,
        datediff(day, max(p.purchase_date), current_date()) as days_since_last_purchase,
        
        -- Behavioral features
        count(distinct case when e.event_type = 'page_view' then e.event_id end) as page_views,
        count(distinct case when e.event_type = 'add_to_cart' then e.event_id end) as add_to_carts,
        count(distinct case when e.event_type = 'checkout' then e.event_id end) as checkouts,
        
        -- Derived features
        case
            when count(distinct p.purchase_id) > 0 then 'purchaser'
            when count(distinct case when e.event_type = 'add_to_cart' then e.event_id end) > 0 then 'cart_abandoner'
            else 'browser'
        end as user_segment,
        
        case
            when datediff(day, max(e.event_date), current_date()) <= 7 then 'active'
            when datediff(day, max(e.event_date), current_date()) <= 30 then 'latent'
            else 'churned'
        end as activity_status
        
    from user_events e
    left join user_sessions s on e.session_id = s.session_id
    left join user_purchases p on e.user_id = p.user_id
    left join (
        select distinct user_id from user_events
    ) u on e.user_id = u.user_id
    group by 1
)

select * from user_features

Training Set Construction

-- models/ml/training_sets/churn_training_set.sql
{{
    config(
        materialized='table',
        tags=['ml', 'training']
    )
}}

{#- Point-in-time correct training set for churn prediction -#}

with feature_snapshot as (
    select
        user_id,
        snapshot_date,
        total_events,
        total_sessions,
        avg_session_duration,
        days_since_last_active,
        total_purchases,
        total_revenue,
        avg_order_value,
        user_segment,
        activity_status
    from {{ ref('user_features_scd2') }}
),

labels as (
    select
        user_id,
        event_date as prediction_date,
        case
            when datediff(day, max(event_date), current_date()) > 30 then 1
            else 0
        end as is_churned
    from {{ ref('fct_events') }}
    group by 1, 2
),

training_data as (
    select
        f.user_id,
        f.snapshot_date,
        f.total_events,
        f.total_sessions,
        f.avg_session_duration,
        f.days_since_last_active,
        f.total_purchases,
        f.total_revenue,
        f.avg_order_value,
        l.is_churned as label
    from feature_snapshot f
    inner join labels l
        on f.user_id = l.user_id
        and f.snapshot_date <= l.prediction_date
        and f.snapshot_date > dateadd(day, -30, l.prediction_date)
)

select * from training_data

Model Performance Tracking

-- models/ml/model_performance.sql
{{
    config(
        materialized='table',
        tags=['ml', 'monitoring']
    )
}}

with predictions as (
    select * from {{ ref('model_predictions') }}
),

actuals as (
    select * from {{ ref('actual_outcomes') }}
),

performance as (
    select
        p.model_name,
        p.model_version,
        p.prediction_date,
        
        -- Classification metrics
        count(*) as total_predictions,
        sum(case when p.predicted = a.actual then 1 else 0 end) as correct_predictions,
        avg(case when p.predicted = a.actual then 1.0 else 0.0 end) as accuracy,
        
        -- Precision/Recall
        sum(case when p.predicted = 1 and a.actual = 1 then 1 else 0 end) as true_positives,
        sum(case when p.predicted = 1 and a.actual = 0 then 1 else 0 end) as false_positives,
        sum(case when p.predicted = 0 and a.actual = 1 then 1 else 0 end) as false_negatives,
        
        -- Revenue impact
        sum(case when p.predicted = 1 and a.actual = 1 then a.actual_revenue else 0 end) as recovered_revenue,
        sum(a.actual_revenue) as total_revenue
        
    from predictions p
    inner join actuals a on p.user_id = a.user_id
    group by 1, 2, 3
),

final as (
    select
        *,
        true_positives / nullif(true_positives + false_positives, 0) as precision,
        true_positives / nullif(true_positives + false_negatives, 0) as recall,
        2 * (precision * recall) / nullif(precision + recall, 0) as f1_score,
        recovered_revenue / nullif(total_revenue, 0) as revenue_recovery_rate
    from performance
)

select * from final

Feature Store with Temporal Validity

-- models/features/user_features_scd2.sql
{{
    config(
        materialized='incremental',
        unique_key='surrogate_key',
        tags=['features', 'scd2']
    )
}}

with current_features as (
    select * from {{ ref('user_features') }}
),

{% if is_incremental() %}
previous_features as (
    select * from {{ this }} where is_current = true
),

changed_features as (
    select c.user_id, c.total_events, c.total_sessions, c.total_revenue
    from current_features c
    left join previous_features p on c.user_id = p.user_id
    where p.user_id is null
       or c.total_events != p.total_events
       or c.total_sessions != p.total_sessions
       or c.total_revenue != p.total_revenue
),

expired as (
    update {{ this }}
    set valid_to = current_timestamp(), is_current = false
    where user_id in (select user_id from changed_features) and is_current = true
    returning *
),

new_versions as (
    select {{ dbt_utils.generate_surrogate_key(['user_id', 'current_timestamp()']) }} as surrogate_key,
        *, current_timestamp() as valid_from, cast(null as timestamp) as valid_to, true as is_current
    from changed_features
)

select * from expired union all select * from new_versions

{% else %}
select {{ dbt_utils.generate_surrogate_key(['user_id', 'current_timestamp()']) }} as surrogate_key,
    *, current_timestamp() as valid_from, cast(null as timestamp) as valid_to, true as is_current
from current_features
{% endif %}

dbt Python Model for ML

# models/ml/churn_prediction.py
import dbt
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split

def model(dbt, session):
    features_df = dbt.ref("training_sets").to_pandas()
    X = features_df.drop(['user_id', 'label'], axis=1)
    y = features_df['label']
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
    
    model = RandomForestClassifier(n_estimators=100, max_depth=10, random_state=42)
    model.fit(X_train, y_train)
    
    return pd.DataFrame({
        'user_id': features_df.loc[X_test.index, 'user_id'],
        'predicted_churn': model.predict(X_test),
        'churn_probability': model.predict_proba(X_test)[:, 1]
    })

Comparison: dbt + ML Approaches

ApproachDescriptionComplexityBest For
dbt OnlyFeature engineering in SQLLowSimple ML pipelines
dbt + Python ModelsPython models for trainingMediumscikit-learn, pandas
dbt + ML PlatformIntegration with MLflow/SageMakerHighEnterprise ML
dbt + Feature StoreDedicated feature storeHighReal-time serving

Best Practices

  1. Separate concerns - dbt for features, ML platform for training
  2. Feature versioning - Use SCD Type 2 for temporal feature validity
  3. Point-in-time correctness - Always prevent data leakage in training sets
  4. Data quality - Test features with dbt tests before ML training
  5. Feature monitoring - Track feature distributions for drift detection
  6. Reproducibility - Version control all feature engineering logic
  7. Documentation - Document feature definitions for ML teams
  8. Incremental updates - Use incremental models for efficient feature refresh

See Also

Advertisement

Need Expert dbt Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement