πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

dbt: Models, Tests, Incremental Loading

Data EngineeringData Transformation⭐ Premium

Advertisement

dbt Labs Interview

dbt: Models, Tests, Incremental Loading

Building reliable data transformations with dbt

Interview Question

"Design a dbt project for an e-commerce company that: (1) ingests data from 5 sources, (2) has staging, intermediate, and mart layers, (3) implements incremental models for large tables, (4) includes data quality tests, (5) generates documentation. Include project structure, configuration, and example models."

Difficulty: Medium-Hard | Frequently asked at dbt Labs, Fishtown Analytics, Snowflake


Theoretical Foundation

What is dbt?

dbt (data build tool) is a SQL-based transformation tool that enables analytics engineering.

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                    dbt Architecture                         β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                             β”‚
β”‚  Source Data ──▢ dbt Models ──▢ Transformed Data            β”‚
β”‚                                                             β”‚
β”‚  dbt Components:                                            β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚  Models: SQL SELECT statements                      β”‚   β”‚
β”‚  β”‚  Tests: Data quality assertions                     β”‚   β”‚
β”‚  β”‚  Snapshots: Historical data (SCD Type 2)            β”‚   β”‚
β”‚  β”‚  Macros: Reusable SQL logic                         β”‚   β”‚
β”‚  β”‚  Seeds: CSV files for reference data                β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                             β”‚
β”‚  dbt does NOT:                                              β”‚
β”‚  - Extract data (use Fivetran, Airbyte)                    β”‚
β”‚  - Load data (use COPY INTO, Snowpipe)                     β”‚
β”‚  - Orchestrate (use Airflow, Dagster)                      β”‚
β”‚                                                             β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

dbt Models

A model is a SQL SELECT statement that creates a table or view.

-- models/staging/stg_orders.sql
-- Materialized as a view (default)

SELECT
    id AS order_id,
    customer_id,
    amount,
    status,
    created_at,
    updated_at
FROM {{ source('production', 'orders') }}
WHERE deleted_at IS NULL

Materializations:

  • view: Default, no storage, always fresh
  • table: Stored, refreshed on each run
  • incremental: Only new/changed rows
  • ephemeral: CTE, not stored
  • materialized_view: Managed materialized view

dbt Tests

Tests are assertions on models.

# models/staging/_staging__models.yml
version: 2

models:
  - name: stg_orders
    columns:
      - name: order_id
        tests:
          - unique
          - not_null
      - name: customer_id
        tests:
          - not_null
          - relationships:
              to: ref('stg_customers')
              field: customer_id
      - name: amount
        tests:
          - not_null
          - dbt_utils.accepted_range:
              min_value: 0
              max_value: 1000000
      - name: status
        tests:
          - accepted_values:
              values: ['pending', 'completed', 'cancelled']

Incremental Models

Incremental models only process new or changed data.

-- models/marts/fct_orders.sql
{{
    config(
        materialized='incremental',
        unique_key='order_id',
        incremental_strategy='merge',
        partition_by={
            "field": "order_date",
            "data_type": "date",
            "granularity": "day"
        }
    )
}}

WITH orders AS (
    SELECT
        order_id,
        customer_id,
        amount,
        status,
        DATE(order_date) AS order_date,
        updated_at
    FROM {{ ref('stg_orders') }}
    
    {% if is_incremental() %}
    WHERE updated_at > (SELECT MAX(updated_at) FROM {{ this }})
    {% endif %}
)

SELECT * FROM orders

Incremental strategies:

  • merge: UPSERT (default for Snowflake, BigQuery)
  • append: Add new rows only
  • delete+insert: Delete old rows, insert new

dbt Project Structure

Architecture Diagram
my_dbt_project/
β”œβ”€β”€ dbt_project.yml              # Project configuration
β”œβ”€β”€ packages.yml                 # Package dependencies
β”œβ”€β”€ profiles.yml                 # Connection profiles
β”œβ”€β”€ models/
β”‚   β”œβ”€β”€ staging/                 # Raw data cleaning
β”‚   β”‚   β”œβ”€β”€ _staging__models.yml # Schema documentation
β”‚   β”‚   β”œβ”€β”€ _sources.yml         # Source definitions
β”‚   β”‚   β”œβ”€β”€ stg_orders.sql
β”‚   β”‚   β”œβ”€β”€ stg_customers.sql
β”‚   β”‚   └── stg_products.sql
β”‚   β”œβ”€β”€ intermediate/            # Business logic
β”‚   β”‚   β”œβ”€β”€ int_orders_enriched.sql
β”‚   β”‚   └── int_customer_lifetime.sql
β”‚   └── marts/                   # Business-level aggregates
β”‚       β”œβ”€β”€ _marts__models.yml
β”‚       β”œβ”€β”€ fct_orders.sql
β”‚       β”œβ”€β”€ dim_customers.sql
β”‚       └── rpt_daily_revenue.sql
β”œβ”€β”€ tests/                       # Custom data tests
β”‚   β”œβ”€β”€ test_order_amount_positive.sql
β”‚   └── test_customer_unique.sql
β”œβ”€β”€ macros/                      # Reusable SQL logic
β”‚   β”œβ”€β”€ generate_schema_name.sql
β”‚   └── materializations.sql
β”œβ”€β”€ seeds/                       # CSV reference data
β”‚   β”œβ”€β”€ country_codes.csv
β”‚   └── product_categories.csv
└── snapshots/                   # SCD Type 2 snapshots
    └── customers_snapshot.sql

dbt Configuration

# dbt_project.yml
name: 'ecommerce'
version: '1.0.0'
config-version: 2

profile: 'snowflake_prod'

model-paths: ["models"]
analysis-paths: ["analyses"]
test-paths: ["tests"]
seed-paths: ["seeds"]
macro-paths: ["macros"]
snapshot-paths: ["snapshots"]

clean-targets:
  - "target"
  - "dbt_packages"

models:
  ecommerce:
    staging:
      +materialized: view
      +schema: staging
      +tags: ["staging"]
    intermediate:
      +materialized: ephemeral
      +tags: ["intermediate"]
    marts:
      +materialized: table
      +schema: analytics
      +tags: ["mart", "daily"]

vars:
  start_date: '2024-01-01'
  end_date: '2024-12-31'

# Package dependencies
packages:
  - package: dbt-labs/dbt_utils
    version: ">=1.0.0"
  - package: dbt-labs/codegen
    version: ">=0.11.0"
  - package: calogica/dbt_expectations
    version: ">=0.10.0"

dbt Source Definitions

# models/staging/_sources.yml
version: 2

sources:
  - name: production
    database: production
    schema: public
    loader: fivetran
    loaded_at_field: _fivetran_synced
    freshness:
      warn_after: {count: 6, period: hour}
      error_after: {count: 24, period: hour}
    tables:
      - name: orders
        description: "Customer orders from e-commerce platform"
        columns:
          - name: id
            description: "Primary key"
            tests:
              - unique
          - name: customer_id
            description: "Foreign key to customers"
          - name: amount
            description: "Order total amount"
          - name: status
            description: "Order status"
          - name: created_at
            description: "Order creation timestamp"
          - name: updated_at
            description: "Last update timestamp"
      
      - name: customers
        description: "Customer information"
        columns:
          - name: id
            description: "Primary key"
          - name: email
            description: "Customer email"
          - name: name
            description: "Customer name"

dbt Macros

-- macros/generate_schema_name.sql
{% macro generate_schema_name(custom_schema_name, node) -%}
    {%- set default_schema = target.schema -%}
    {%- if custom_schema_name is none -%}
        {{ default_schema }}
    {%- else -%}
        {{ default_schema }}_{{ custom_schema_name }}
    {%- endif -%}
{%- endmacro %}

-- macros/insert_audit_columns.sql
{% macro insert_audit_columns() %}
    , CURRENT_TIMESTAMP AS _loaded_at
    , '{{ invocation_id }}' AS _dbt_invocation_id
    , '{{ run_started_at }}' AS _dbt_run_started_at
{% endmacro %}

-- macros/validate_email.sql
{% macro validate_email(column_name) %}
    {{ column_name }} REGEXP '^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\\.[A-Za-z]{2,}$'
{% endmacro %}

dbt Snapshots

-- snapshots/customers_snapshot.sql
{% snapshot customers_snapshot %}

{{
    config(
        target_schema='snapshots',
        unique_key='id',
        strategy='timestamp',
        updated_at='updated_at',
    )
}}

SELECT * FROM {{ source('production', 'customers') }}

{% endsnapshot %}

dbt Tests

-- tests/test_order_amount_positive.sql
-- Custom data test

SELECT
    order_id,
    amount
FROM {{ ref('fct_orders') }}
WHERE amount < 0
-- Should return 0 rows to pass
# models/marts/_marts__models.yml
version: 2

models:
  - name: fct_orders
    tests:
      - dbt_utils.expression_is_true:
          expression: "amount >= 0"
      - dbt_utils.unique Combination of columns:
          combination_of_columns:
            - order_id
            - order_date
    columns:
      - name: order_id
        tests:
          - unique
          - not_null
      - name: order_date
        tests:
          - not_null
          - dbt_utils.accepted_range:
              min_value: "2020-01-01"
              max_value: "2030-12-31"

Code Implementation

Complete dbt Project Example

-- ============================================================
-- STAGING LAYER
-- ============================================================

-- models/staging/stg_orders.sql
{{
    config(
        materialized='view',
        tags=['staging', 'orders']
    )
}}

SELECT
    id AS order_id,
    customer_id,
    amount,
    status,
    created_at AS order_created_at,
    updated_at AS order_updated_at,
    _fivetran_synced AS _loaded_at
FROM {{ source('production', 'orders') }}
WHERE deleted_at IS NULL

-- models/staging/stg_customers.sql
{{
    config(
        materialized='view',
        tags=['staging', 'customers']
    )
}}

SELECT
    id AS customer_id,
    email,
    name AS customer_name,
    created_at AS customer_created_at,
    updated_at AS customer_updated_at,
    _fivetran_synced AS _loaded_at
FROM {{ source('production', 'customers') }}
WHERE deleted_at IS NULL

-- models/staging/stg_products.sql
{{
    config(
        materialized='view',
        tags=['staging', 'products']
    )
}}

SELECT
    id AS product_id,
    name AS product_name,
    category,
    price,
    created_at AS product_created_at,
    _fivetran_synced AS _loaded_at
FROM {{ source('production', 'products') }}
WHERE deleted_at IS NULL

-- ============================================================
-- INTERMEDIATE LAYER
-- ============================================================

-- models/intermediate/int_orders_enriched.sql
{{
    config(
        materialized='ephemeral',
        tags=['intermediate']
    )
}}

WITH orders AS (
    SELECT * FROM {{ ref('stg_orders') }}
),

customers AS (
    SELECT * FROM {{ ref('stg_customers') }}
),

enriched_orders AS (
    SELECT
        o.order_id,
        o.customer_id,
        c.customer_name,
        c.email AS customer_email,
        o.amount,
        o.status,
        o.order_created_at,
        o.order_updated_at,
        {{ dbt_utils.date_trunc('day', 'o.order_created_at') }} AS order_date
    FROM orders o
    LEFT JOIN customers c ON o.customer_id = c.customer_id
)

SELECT * FROM enriched_orders

-- models/intermediate/int_customer_lifetime.sql
{{
    config(
        materialized='ephemeral',
        tags=['intermediate']
    )
}}

WITH orders AS (
    SELECT * FROM {{ ref('int_orders_enriched') }}
),

customer_lifetime AS (
    SELECT
        customer_id,
        customer_name,
        MIN(order_created_at) AS first_order_date,
        MAX(order_created_at) AS last_order_date,
        COUNT(DISTINCT order_id) AS total_orders,
        SUM(amount) AS lifetime_value,
        AVG(amount) AS avg_order_value
    FROM orders
    WHERE status = 'completed'
    GROUP BY customer_id, customer_name
)

SELECT * FROM customer_lifetime

-- ============================================================
-- MARTS LAYER
-- ============================================================

-- models/marts/fct_orders.sql
{{
    config(
        materialized='incremental',
        unique_key='order_id',
        incremental_strategy='merge',
        partition_by={
            "field": "order_date",
            "data_type": "date",
            "granularity": "day"
        },
        cluster_by=['customer_id', 'status'],
        tags=['mart', 'orders', 'daily']
    )
}}

WITH orders AS (
    SELECT * FROM {{ ref('int_orders_enriched') }}
),

final AS (
    SELECT
        order_id,
        customer_id,
        customer_name,
        customer_email,
        amount,
        status,
        order_date,
        order_created_at,
        order_updated_at,
        {{ current_timestamp() }} AS _dbt_loaded_at
    FROM orders
    
    {% if is_incremental() %}
    WHERE order_updated_at > (SELECT MAX(order_updated_at) FROM {{ this }})
    {% endif %}
)

SELECT * FROM final

-- models/marts/dim_customers.sql
{{
    config(
        materialized='incremental',
        unique_key='customer_id',
        incremental_strategy='merge',
        tags=['mart', 'customers']
    )
}}

WITH customer_lifetime AS (
    SELECT * FROM {{ ref('int_customer_lifetime') }}
),

customer_orders AS (
    SELECT
        customer_id,
        COUNT(DISTINCT order_id) AS recent_orders,
        SUM(amount) AS recent_value
    FROM {{ ref('fct_orders') }}
    WHERE order_date >= DATEADD(month, -3, CURRENT_DATE())
    GROUP BY customer_id
),

final AS (
    SELECT
        cl.customer_id,
        cl.customer_name,
        cl.first_order_date,
        cl.last_order_date,
        cl.total_orders,
        cl.lifetime_value,
        cl.avg_order_value,
        COALESCE(co.recent_orders, 0) AS recent_orders,
        COALESCE(co.recent_value, 0) AS recent_value,
        CASE
            WHEN cl.lifetime_value > 10000 THEN 'VIP'
            WHEN cl.lifetime_value > 1000 THEN 'Gold'
            WHEN cl.lifetime_value > 100 THEN 'Silver'
            ELSE 'Bronze'
        END AS customer_tier,
        {{ current_timestamp() }} AS _dbt_loaded_at
    FROM customer_lifetime cl
    LEFT JOIN customer_orders co ON cl.customer_id = co.customer_id
    
    {% if is_incremental() %}
    WHERE cl.last_order_date > (SELECT MAX(last_order_date) FROM {{ this }})
    {% endif %}
)

SELECT * FROM final

-- models/marts/rpt_daily_revenue.sql
{{
    config(
        materialized='table',
        partition_by={
            "field": "order_date",
            "data_type": "date",
            "granularity": "day"
        },
        cluster_by=['customer_segment'],
        tags=['mart', 'report', 'daily']
    )
}}

WITH orders AS (
    SELECT * FROM {{ ref('fct_orders') }}
),

daily_summary AS (
    SELECT
        order_date,
        COUNT(DISTINCT order_id) AS order_count,
        COUNT(DISTINCT customer_id) AS customer_count,
        SUM(amount) AS total_revenue,
        AVG(amount) AS avg_order_value,
        SUM(CASE WHEN status = 'completed' THEN amount ELSE 0 END) AS completed_revenue,
        SUM(CASE WHEN status = 'cancelled' THEN amount ELSE 0 END) AS cancelled_revenue
    FROM orders
    GROUP BY order_date
),

customer_segments AS (
    SELECT
        order_date,
        customer_tier,
        COUNT(DISTINCT customer_id) AS customers,
        SUM(amount) AS revenue
    FROM orders
    JOIN {{ ref('dim_customers') }} USING (customer_id)
    GROUP BY order_date, customer_tier
),

final AS (
    SELECT
        ds.order_date,
        ds.order_count,
        ds.customer_count,
        ds.total_revenue,
        ds.avg_order_value,
        ds.completed_revenue,
        ds.cancelled_revenue,
        cs.customer_tier,
        cs.customers AS segment_customers,
        cs.revenue AS segment_revenue,
        {{ current_timestamp() }} AS _dbt_loaded_at
    FROM daily_summary ds
    LEFT JOIN customer_segments cs ON ds.order_date = cs.order_date
)

SELECT * FROM final

dbt Commands

# Run all models
dbt run

# Run specific model
dbt run --select stg_orders

# Run models downstream of a model
dbt run --select +fct_orders

# Run tests
dbt test

# Run specific test
dbt test --select stg_orders

# Generate documentation
dbt docs generate

# Serve documentation
dbt docs serve

# Run snapshot
dbt snapshot

# Seed CSV files
dbt seed

# Clean project
dbt clean

# Compile models (without running)
dbt compile

πŸ’‘

Production Tip: Use dbt's --select flag extensively for targeted runs. During development, run only changed models. In production, run full DAG but use --fail-fast to stop on first error.


Common Follow-Up Questions

Q1: How do you handle late-arriving data in dbt?

-- Use is_incremental() to capture late-arriving data
{% if is_incremental() %}
WHERE updated_at > (SELECT MAX(updated_at) FROM {{ this }})
   OR created_at > (SELECT MAX(updated_at) FROM {{ this }})
{% endif %}

Q2: How do you test data quality in dbt?

# Use dbt_expectations for advanced tests
- name: fct_orders
  tests:
    - dbt_expectations.expect_table_columns_to_not_contain_nulls:
        column_list:
          - order_id
          - customer_id
          - amount
    - dbt_expectations.expect_column_values_to_be_unique:
        column_name: order_id
    - dbt_expectations.expect_column_values_to_be_between:
        column_name: amount
        min_value: 0
        max_value: 1000000

Q3: How do you handle schema evolution in dbt?

-- dbt handles schema evolution automatically
-- New columns are added to the target table
-- Use {{ config(materialized='incremental') }} for automatic schema evolution

-- For column drops, use dbt's --full-refresh flag
dbt run --select fct_orders --full-refresh

Q4: How do you schedule dbt in production?

# Airflow DAG for dbt
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago

with DAG('dbt_pipeline', schedule_interval='0 6 * * *', start_date=days_ago(1)) as dag:
    
    dbt_run = BashOperator(
        task_id='dbt_run',
        bash_command='cd /opt/dbt && dbt run --fail-fast',
    )
    
    dbt_test = BashOperator(
        task_id='dbt_test',
        bash_command='cd /opt/dbt && dbt test --fail-fast',
    )
    
    dbt_run >> dbt_test

⚠️

Critical Consideration: dbt models are idempotentβ€”running them multiple times should produce the same result. Always use unique_key for incremental models and avoid non-deterministic logic.


Company-Specific Tips

dbt Labs Interview Tips

  • Discuss dbt architecture and design principles
  • Explain materializations and when to use each
  • Mention testing strategies
  • Talk about packages and macros

Snowflake Interview Tips

  • Focus on dbt + Snowflake integration
  • Discuss incremental models with Snowflake
  • Mention time travel for testing
  • Talk about clustering in dbt models

Data Analytics Interview Tips

  • Explain analytics engineering role
  • Discuss data modeling with dbt
  • Mention documentation as code
  • Talk about CI/CD for dbt

ℹ️

Final Takeaway: dbt is the standard for SQL-based transformations. Use staging for cleaning, intermediate for business logic, and marts for analytics. Always test your models, document your work, and use incremental models for large tables.

Advertisement