dbt + AI/ML Integration
AI/ML Integration Architecture
ML Pipeline Flow
Formal Definitions
DfFeature Engineering
Feature engineering is the process of using domain knowledge to create input variables (features) from raw data that improve machine learning model performance. In dbt, feature engineering is implemented through SQL models that transform raw data into model-ready features: aggregations, encodings, time-based features, and interaction features.
DfFeature Store
A feature store is a centralized repository for storing, serving, and managing machine learning features. It provides consistent feature computation across training and serving, feature versioning, and point-in-time correctness. In dbt, feature stores are materialized tables/views containing precomputed features with temporal validity.
DfPoint-in-Time Correctness
Point-in-time correctness ensures that when training a model, features are joined using only data available at the prediction time. This prevents data leakage where future information inadvertently influences training. dbt's temporal join patterns and SCD Type 2 tables enable point-in-time correct feature construction.
DfModel Registry
A model registry is a centralized repository for tracking machine learning models, their versions, metadata, and performance metrics. It enables model versioning, A/B testing, and rollback capabilities. dbt can generate model metadata tables that serve as lightweight registries.
Detailed Explanation
dbt transforms raw data into clean, tested, documented datasetsβexactly what ML pipelines need. By integrating dbt with ML workflows, teams can build reproducible feature engineering pipelines with built-in quality checks.
Why dbt for ML
- Reproducibility - dbt models are version-controlled and deterministic
- Data Quality - Built-in tests ensure feature correctness
- Documentation - Self-documenting features for ML teams
- Lineage - Track which raw data feeds into each feature
- Scheduling - Automated feature refresh with dbt Cloud jobs
ML Integration Patterns
| Pattern | Description | Use Case |
|---|---|---|
| Feature Store | Materialized feature tables | Real-time serving |
| Training Sets | Point-in-time correct datasets | Model training |
| Prediction Logging | Store model predictions | Monitoring |
| Drift Detection | Track feature distributions | Model health |
dbt is not a replacement for dedicated ML platforms (MLflow, Kubeflow, SageMaker). Instead, it complements them by handling the data preparation layer. Use dbt for feature engineering and data quality, and integrate with ML platforms for model training and serving.
Use dbt's incremental materialization for feature stores to efficiently update features as new data arrives. Combine with SCD Type 2 tables for temporal feature validity. This enables point-in-time correct feature joins without recomputing the entire feature table.
Code Examples
Feature Engineering Model
-- models/features/user_features.sql
{{
config(
materialized='table',
tags=['features', 'ml']
)
}}
with user_events as (
select * from {{ ref('fct_events') }}
),
user_sessions as (
select * from {{ ref('fct_user_sessions') }}
),
user_purchases as (
select * from {{ ref('fct_purchases') }}
),
user_features as (
select
u.user_id,
-- Engagement features
count(distinct e.event_id) as total_events,
count(distinct e.session_id) as total_sessions,
avg(s.session_duration_seconds) as avg_session_duration,
max(e.event_date) as last_active_date,
datediff(day, max(e.event_date), current_date()) as days_since_last_active,
-- Purchase features
count(distinct p.purchase_id) as total_purchases,
sum(p.purchase_amount) as total_revenue,
avg(p.purchase_amount) as avg_order_value,
max(p.purchase_date) as last_purchase_date,
datediff(day, max(p.purchase_date), current_date()) as days_since_last_purchase,
-- Behavioral features
count(distinct case when e.event_type = 'page_view' then e.event_id end) as page_views,
count(distinct case when e.event_type = 'add_to_cart' then e.event_id end) as add_to_carts,
count(distinct case when e.event_type = 'checkout' then e.event_id end) as checkouts,
-- Derived features
case
when count(distinct p.purchase_id) > 0 then 'purchaser'
when count(distinct case when e.event_type = 'add_to_cart' then e.event_id end) > 0 then 'cart_abandoner'
else 'browser'
end as user_segment,
case
when datediff(day, max(e.event_date), current_date()) <= 7 then 'active'
when datediff(day, max(e.event_date), current_date()) <= 30 then 'latent'
else 'churned'
end as activity_status
from user_events e
left join user_sessions s on e.session_id = s.session_id
left join user_purchases p on e.user_id = p.user_id
left join (
select distinct user_id from user_events
) u on e.user_id = u.user_id
group by 1
)
select * from user_features
Training Set Construction
-- models/ml/training_sets/churn_training_set.sql
{{
config(
materialized='table',
tags=['ml', 'training']
)
}}
{#- Point-in-time correct training set for churn prediction -#}
with feature_snapshot as (
select
user_id,
snapshot_date,
total_events,
total_sessions,
avg_session_duration,
days_since_last_active,
total_purchases,
total_revenue,
avg_order_value,
user_segment,
activity_status
from {{ ref('user_features_scd2') }}
),
labels as (
select
user_id,
event_date as prediction_date,
case
when datediff(day, max(event_date), current_date()) > 30 then 1
else 0
end as is_churned
from {{ ref('fct_events') }}
group by 1, 2
),
training_data as (
select
f.user_id,
f.snapshot_date,
f.total_events,
f.total_sessions,
f.avg_session_duration,
f.days_since_last_active,
f.total_purchases,
f.total_revenue,
f.avg_order_value,
l.is_churned as label
from feature_snapshot f
inner join labels l
on f.user_id = l.user_id
and f.snapshot_date <= l.prediction_date
and f.snapshot_date > dateadd(day, -30, l.prediction_date)
)
select * from training_data
Model Performance Tracking
-- models/ml/model_performance.sql
{{
config(
materialized='table',
tags=['ml', 'monitoring']
)
}}
with predictions as (
select * from {{ ref('model_predictions') }}
),
actuals as (
select * from {{ ref('actual_outcomes') }}
),
performance as (
select
p.model_name,
p.model_version,
p.prediction_date,
-- Classification metrics
count(*) as total_predictions,
sum(case when p.predicted = a.actual then 1 else 0 end) as correct_predictions,
avg(case when p.predicted = a.actual then 1.0 else 0.0 end) as accuracy,
-- Precision/Recall
sum(case when p.predicted = 1 and a.actual = 1 then 1 else 0 end) as true_positives,
sum(case when p.predicted = 1 and a.actual = 0 then 1 else 0 end) as false_positives,
sum(case when p.predicted = 0 and a.actual = 1 then 1 else 0 end) as false_negatives,
-- Revenue impact
sum(case when p.predicted = 1 and a.actual = 1 then a.actual_revenue else 0 end) as recovered_revenue,
sum(a.actual_revenue) as total_revenue
from predictions p
inner join actuals a on p.user_id = a.user_id
group by 1, 2, 3
),
final as (
select
*,
true_positives / nullif(true_positives + false_positives, 0) as precision,
true_positives / nullif(true_positives + false_negatives, 0) as recall,
2 * (precision * recall) / nullif(precision + recall, 0) as f1_score,
recovered_revenue / nullif(total_revenue, 0) as revenue_recovery_rate
from performance
)
select * from final
Feature Store with Temporal Validity
-- models/features/user_features_scd2.sql
{{
config(
materialized='incremental',
unique_key='surrogate_key',
tags=['features', 'scd2']
)
}}
with current_features as (
select * from {{ ref('user_features') }}
),
{% if is_incremental() %}
previous_features as (
select * from {{ this }} where is_current = true
),
changed_features as (
select c.user_id, c.total_events, c.total_sessions, c.total_revenue
from current_features c
left join previous_features p on c.user_id = p.user_id
where p.user_id is null
or c.total_events != p.total_events
or c.total_sessions != p.total_sessions
or c.total_revenue != p.total_revenue
),
expired as (
update {{ this }}
set valid_to = current_timestamp(), is_current = false
where user_id in (select user_id from changed_features) and is_current = true
returning *
),
new_versions as (
select {{ dbt_utils.generate_surrogate_key(['user_id', 'current_timestamp()']) }} as surrogate_key,
*, current_timestamp() as valid_from, cast(null as timestamp) as valid_to, true as is_current
from changed_features
)
select * from expired union all select * from new_versions
{% else %}
select {{ dbt_utils.generate_surrogate_key(['user_id', 'current_timestamp()']) }} as surrogate_key,
*, current_timestamp() as valid_from, cast(null as timestamp) as valid_to, true as is_current
from current_features
{% endif %}
dbt Python Model for ML
# models/ml/churn_prediction.py
import dbt
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
def model(dbt, session):
features_df = dbt.ref("training_sets").to_pandas()
X = features_df.drop(['user_id', 'label'], axis=1)
y = features_df['label']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
model = RandomForestClassifier(n_estimators=100, max_depth=10, random_state=42)
model.fit(X_train, y_train)
return pd.DataFrame({
'user_id': features_df.loc[X_test.index, 'user_id'],
'predicted_churn': model.predict(X_test),
'churn_probability': model.predict_proba(X_test)[:, 1]
})
Comparison: dbt + ML Approaches
| Approach | Description | Complexity | Best For |
|---|---|---|---|
| dbt Only | Feature engineering in SQL | Low | Simple ML pipelines |
| dbt + Python Models | Python models for training | Medium | scikit-learn, pandas |
| dbt + ML Platform | Integration with MLflow/SageMaker | High | Enterprise ML |
| dbt + Feature Store | Dedicated feature store | High | Real-time serving |
Best Practices
- Separate concerns - dbt for features, ML platform for training
- Feature versioning - Use SCD Type 2 for temporal feature validity
- Point-in-time correctness - Always prevent data leakage in training sets
- Data quality - Test features with dbt tests before ML training
- Feature monitoring - Track feature distributions for drift detection
- Reproducibility - Version control all feature engineering logic
- Documentation - Document feature definitions for ML teams
- Incremental updates - Use incremental models for efficient feature refresh
See Also
- Incremental Models β Incremental feature updates
- SCD Type 2 β Temporal feature validity
- dbt Best Practices β Testing and documentation
- Python Models β Python model integration