dbt with PostgreSQL
PostgreSQL Architecture
Architecture Diagram
āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā
ā POSTGRESQL + DBT ARCHITECTURE ā
āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā¤
ā ā
ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā
ā ā POSTGRESQL COMPONENTS ā ā
ā ā ā ā
ā ā āāāāāāāāāāāāāāāā āāāāāāāāāāāāāāāā āāāāāāāāāāāāāāāāāāāāāāāāāāāā ā ā
ā ā ā DATABASES ā ā SCHEMAS ā ā TABLES ā ā ā
ā ā ā ā ā ā ā ā ā ā
ā ā ā ⢠Logical ā ā ⢠Namespace ā ā ⢠Heap tables ā ā ā
ā ā ā containers ā ā ⢠Access ā ā ⢠Temporary ā ā ā
ā ā ā ⢠Shared ā ā control ā ā ⢠Unlogged ā ā ā
ā ā āāāāāāāāāāāāāāāā āāāāāāāāāāāāāāāā āāāāāāāāāāāāāāāāāāāāāāāāāāāā ā ā
ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā
ā ā ā
ā ā¼ ā
ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā
ā ā POSTGRESQL FEATURES ā ā
ā ā ā ā
ā ā āāāāāāāāāāāāāāāā āāāāāāāāāāāāāāāā āāāāāāāāāāāāāāāāāāāāāāāāāāāā ā ā
ā ā ā EXTENSIONS ā ā INDEXING ā ā PARTITIONING ā ā ā
ā ā ā ā ā ā ā ā ā ā
ā ā ā ⢠PostGIS ā ā ⢠B-tree ā ā ⢠Range ā ā ā
ā ā ā ⢠pg_trgm ā ā ⢠Hash ā ā ⢠List ā ā ā
ā ā ā ⢠hstore ā ā ⢠GiST ā ā ⢠Hash ā ā ā
ā ā āāāāāāāāāāāāāāāā āāāāāāāāāāāāāāāā āāāāāāāāāāāāāāāāāāāāāāāāāāāā ā ā
ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā
ā ā
āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā
Indexing Strategy
Architecture Diagram
āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā
ā INDEXING STRATEGY ā
āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā¤
ā ā
ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā
ā ā INDEX TYPES ā ā
ā ā ā ā
ā ā āāāāāāāāāāāāāāāā āāāāāāāāāāāāāāāā āāāāāāāāāāāāāāāāāāāāāāāāāāāā ā ā
ā ā ā B-TREE ā ā HASH ā ā GiST ā ā ā
ā ā ā ā ā ā ā ā ā ā
ā ā ā ⢠Default ā ā ⢠Equality ā ā ⢠Geometric ā ā ā
ā ā ā ⢠Range ā ā ⢠Exact ā ā ⢠Full-text ā ā ā
ā ā ā queries ā ā matches ā ā ⢠Array ā ā ā
ā ā āāāāāāāāāāāāāāāā āāāāāāāāāāāāāāāā āāāāāāāāāāāāāāāāāāāāāāāāāāāā ā ā
ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā
ā ā ā
ā ā¼ ā
ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā
ā ā INDEX USAGE ā ā
ā ā ā ā
ā ā Query: SELECT * FROM orders WHERE customer_id = 1001 ā ā
ā ā ā ā
ā ā Without index: ā ā
ā ā āāā Sequential scan: 100M rows ā ā
ā ā āāā Time: 30 seconds ā ā
ā ā āāā Cost: 1000 ā ā
ā ā ā ā
ā ā With B-tree index on customer_id: ā ā
ā ā āāā Index scan: 1000 rows ā ā
ā ā āāā Time: 0.01 seconds ā ā
ā ā āāā Cost: 10 ā ā
ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā
ā ā
āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā
Partitioning Strategy
Architecture Diagram
āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā
ā PARTITIONING STRATEGY ā
āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā¤
ā ā
ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā
ā ā PARTITION TYPES ā ā
ā ā ā ā
ā ā āāāāāāāāāāāāāāāā āāāāāāāāāāāāāāāā āāāāāāāāāāāāāāāāāāāāāāāāāāāā ā ā
ā ā ā RANGE ā ā LIST ā ā HASH ā ā ā
ā ā ā ā ā ā ā ā ā ā
ā ā ā ⢠Date ā ā ⢠Category ā ā ⢠Uniform ā ā ā
ā ā ā ranges ā ā ⢠Region ā ā distribution ā ā ā
ā ā ā ⢠Numeric ā ā ⢠Status ā ā ⢠High-cardinality ā ā ā
ā ā āāāāāāāāāāāāāāāā āāāāāāāāāāāāāāāā āāāāāāāāāāāāāāāāāāāāāāāāāāāā ā ā
ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā
ā ā ā
ā ā¼ ā
ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā
ā ā PARTITION EXAMPLE ā ā
ā ā ā ā
ā ā CREATE TABLE orders ( ā ā
ā ā order_id integer, ā ā
ā ā customer_id integer, ā ā
ā ā order_date date, ā ā
ā ā amount decimal(10,2) ā ā
ā ā ) PARTITION BY RANGE (order_date); ā ā
ā ā ā ā
ā ā CREATE TABLE orders_2024_01 PARTITION OF orders ā ā
ā ā FOR VALUES FROM ('2024-01-01') TO ('2024-02-01'); ā ā
ā ā ā ā
ā ā CREATE TABLE orders_2024_02 PARTITION OF orders ā ā
ā ā FOR VALUES FROM ('2024-02-01') TO ('2024-03-01'); ā ā
ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā
ā ā
āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā
Detailed Explanation
PostgreSQL is a powerful, open-source object-relational database with extensibility and SQL compliance.
PostgreSQL + dbt Features
- Extensions: Add functionality (PostGIS, pg_trgm)
- Indexing: Multiple index types for optimization
- Partitioning: Table partitioning for large datasets
- Full-text Search: Built-in text search capabilities
- JSON Support: Native JSON/JSONB data types
Indexing in PostgreSQL
PostgreSQL supports several index types:
- B-tree: Default, good for range and equality
- Hash: Good for exact equality matches
- GiST: Good for geometric and full-text search
- GIN: Good for array and full-text search
- BRIN: Good for large tables with natural ordering
Partitioning in PostgreSQL
PostgreSQL provides declarative partitioning:
- Range partitioning: Partition by value ranges
- List partitioning: Partition by explicit values
- Hash partitioning: Partition by hash of column
Best Practices for PostgreSQL
- Create appropriate indexes - Match query patterns
- Use partitioning - For large tables
- Analyze tables regularly - Update statistics
- Vacuum tables - Reclaim space
- Use connection pooling - Manage connections
- Monitor performance - Use pg_stat tables
- Use extensions - Add functionality as needed
- Implement archival - Archive old data
Code Examples
PostgreSQL Profile Configuration
# profiles.yml
my_profile:
target: dev
outputs:
dev:
type: postgres
host: localhost
port: 5432
user: my_user
password: "{{ env_var('POSTGRES_PASSWORD') }}"
dbname: analytics_dev
schema: dbt_dev
threads: 4
connect_timeout: 10
sslmode: prefer
prod:
type: postgres
host: my-rds-instance.xxxxxxxxxxxx.us-west-2.rds.amazonaws.com
port: 5432
user: service_account
password: "{{ env_var('POSTGRES_PASSWORD') }}"
dbname: analytics_prod
schema: public
threads: 8
connect_timeout: 30
sslmode: require
Index Creation
-- macros/postgres/create_indexes.sql
{% macro create_indexes(table_name, columns, index_type='btree') %}
{% for column in columns %}
create index if not exists idx_{{ table_name }}_{{ column }}
on {{ table_name }} using {{ index_type }} ({{ column }});
{% endfor %}
{% endmacro %}
-- Usage in model
{{
config(
materialized='table',
post_hook=[
"{{ create_indexes(this, ['customer_id', 'order_date']) }}"
]
)
}}
Partitioned Table
-- models/marts/fct_orders.sql
{{
config(
materialized='incremental',
unique_key='order_id',
incremental_strategy='merge',
partition_by={
"field": "order_date",
"data_type": "date"
}
)
}}
with orders as (
select * from {{ ref('stg_orders') }}
),
final as (
select
order_id,
customer_id,
order_date,
order_status,
amount,
current_timestamp() as updated_at
from orders
)
select * from final
{% if is_incremental() %}
where order_date >= date_sub(
(select max(order_date) from {{ this }}),
interval 7 day
)
{% endif %}
PostgreSQL Extensions
-- macros/postgres/enable_extensions.sql
{% macro enable_extension(extension_name) %}
create extension if not exists {{ extension_name }};
{% endmacro %}
-- Enable PostGIS
{{ enable_extension('postgis') }}
-- Enable pg_trgm for fuzzy matching
{{ enable_extension('pg_trgm') }}
Full-Text Search
-- macros/postgres/full_text_search.sql
{% macro full_text_search(column_name, search_term) %}
to_tsvector('english', {{ column_name }}) @@ plainto_tsquery('english', '{{ search_term }}')
{% endmacro %}
-- Usage in model
select *
from {{ ref('articles') }}
where {{ full_text_search('content', 'dbt tutorial') }}
JSON Operations
-- macros/postgres/json_extract.sql
{% macro json_extract(column_name, path) %}
{{ column_name }}::json->'{{ path }}'
{% endmacro %}
{% macro json_extract_text(column_name, path) %}
{{ column_name }}::json->>'{{ path }}'
{% endmacro %}
-- Usage in model
select
id,
{{ json_extract_text('metadata', 'author') }} as author,
{{ json_extract_text('metadata', 'tags') }} as tags
from {{ ref('articles') }}
Optimized PostgreSQL Model
-- models/marts/fct_orders_optimized.sql
{{
config(
materialized='incremental',
unique_key='order_id',
incremental_strategy='merge',
post_hook=[
"vacuum analyze {{ this }}",
"create index if not exists idx_{{ this.name }}_customer_id on {{ this }} (customer_id)",
"create index if not exists idx_{{ this.name }}_order_date on {{ this }} (order_date)"
]
)
}}
with orders as (
select * from {{ ref('stg_orders') }}
),
final as (
select
order_id,
customer_id,
order_date,
order_status,
amount,
current_timestamp() as updated_at
from orders
)
select * from final
{% if is_incremental() %}
where updated_at > (select max(updated_at) from {{ this }})
{% endif %}
Performance Metrics
| Metric | Description | Target |
|---|---|---|
| Query Time | Average query execution | <5s |
| Index Creation | Time to create indexes | <1min |
| Vacuum Time | Time to vacuum tables | <5min |
| Connection Count | Active connections | <100 |
| Cache Hit Ratio | Buffer cache hits | >99% |
Best Practices
- Create appropriate indexes - Match query patterns
- Use partitioning - For large tables
- Analyze tables regularly - Update statistics
- Vacuum tables - Reclaim space
- Use connection pooling - Manage connections
- Monitor performance - Use pg_stat tables
- Use extensions - Add functionality as needed
- Implement archival - Archive old data