Dataform for SQL-Based Data Transformation
Dataform Overview
Dataform is GCP's managed service for SQL-based data transformation. It's similar to dbt but fully integrated with BigQuery and GCP.
Core Concepts
SQLX Files:
- Combine SQL with JavaScript templating
- Define data transformations
- Support incremental models
- Enable dynamic SQL generation
Actions:
- Tables: Materialized tables in BigQuery
- Views: Virtual tables for logic separation
- Assertions: Data quality tests
- Operations: Custom SQL scripts
Dependencies:
- Automatic dependency resolution
- DAG-based execution order
- Incremental processing support
- Parallel execution
Project Structure
Directory Layout
ποΈ GCP Data Engineering Reference Architecture
Interview Tip: GCP's data engineering stack is serverless-first. Dataflow (Apache Beam) handles both streaming and batch. BigQuery is the flagship analytics service.
Configuration Files
// dataform.json
{
"warehouse": "bigquery",
"defaultSchema": "dataform",
"defaultDatabase": "my-project",
"defaultLocation": "US",
"assertionSchema": "dataform_assertions",
"variables": {
"environment": "production",
"project_id": "my-project",
"dataset": "analytics"
}
}
// package.json
{
"name": "my-dataform-project",
"version": "1.0.0",
"description": "Dataform transformation project",
"dependencies": {
"@dataform/core": "2.0.0",
"@dataform/assertions": "2.0.0"
}
}
SQLX Files
SQLX files combine SQL with JavaScript templating for dynamic transformations.
Basic SQLX Structure
-- definitions/sources/raw_sales.sqlx
config {
type: "declaration",
database: "my-project",
schema: "raw",
name: "sales",
description: "Raw sales data from source system"
}
Staging Layer
-- definitions/staging/stg_sales.sqlx
config {
type: "view",
schema: "staging",
description: "Staged sales data with standardization"
}
SELECT
sale_id,
TRIM(UPPER(product_id)) as product_id,
TRIM(LOWER(customer_id)) as customer_id,
SAFE_CAST(quantity AS INT64) as quantity,
SAFE_CAST(amount AS FLOAT64) as amount,
SAFE_CAST(sale_date AS DATE) as sale_date,
TRIM(UPPER(region)) as region,
CURRENT_TIMESTAMP() as loaded_at
FROM ${ref("sales")}
WHERE sale_id IS NOT NULL
AND amount > 0
Intermediate Layer
-- definitions/intermediate/int_sales_enriched.sqlx
config {
type: "table",
schema: "intermediate",
description: "Sales data enriched with customer and product information",
tags: ["daily", "sales"]
}
WITH sales AS (
SELECT * FROM ${ref("stg_sales")}
),
customers AS (
SELECT * FROM ${ref("stg_customers")}
),
products AS (
SELECT * FROM ${ref("stg_products")}
),
enriched_sales AS (
SELECT
s.sale_id,
s.product_id,
p.product_name,
p.category,
s.customer_id,
c.customer_name,
c.customer_segment,
s.quantity,
s.amount,
s.sale_date,
s.region,
DATE_DIFF(CURRENT_DATE(), s.sale_date, DAY) as days_since_sale,
s.loaded_at
FROM sales s
LEFT JOIN customers c ON s.customer_id = c.customer_id
LEFT JOIN products p ON s.product_id = p.product_id
)
SELECT * FROM enriched_sales
Marts Layer
-- definitions/marts/sales_daily_summary.sqlx
config {
type: "table",
schema: "marts",
description: "Daily sales summary for reporting",
tags: ["daily", "reporting"],
dependencies: ["intermediate.int_sales_enriched"]
}
WITH daily_sales AS (
SELECT
sale_date,
region,
category,
customer_segment,
COUNT(DISTINCT sale_id) as transaction_count,
COUNT(DISTINCT customer_id) as unique_customers,
SUM(quantity) as total_quantity,
SUM(amount) as total_amount,
AVG(amount) as avg_order_value,
MIN(amount) as min_order_value,
MAX(amount) as max_order_value
FROM ${ref("int_sales_enriched")}
WHERE sale_date >= DATE_SUB(CURRENT_DATE(), INTERVAL 365 DAY)
GROUP BY 1, 2, 3, 4
)
SELECT
*,
SAFE_DIVIDE(total_amount, unique_customers) as revenue_per_customer,
SAFE_DIVIDE(total_amount, transaction_count) as revenue_per_transaction
FROM daily_sales
Assertions
Assertions validate data quality and business rules.
Basic Assertions
-- definitions/tests/assert_unique_sale_id.sqlx
config {
type: "assertion",
schema: "dataform_assertions",
description: "Assert that sale_id is unique"
}
SELECT sale_id
FROM ${ref("int_sales_enriched")}
GROUP BY sale_id
HAVING COUNT(*) > 1
Not Null Assertions
-- definitions/tests/assert_not_null_amount.sqlx
config {
type: "assertion",
schema: "dataform_assertions",
description: "Assert that amount is not null"
}
SELECT amount
FROM ${ref("int_sales_enriched")}
WHERE amount IS NULL
Range Assertions
-- definitions/tests/assert_valid_amounts.sqlx
config {
type: "assertion",
schema: "dataform_assertions",
description: "Assert that amounts are within valid range"
}
SELECT amount
FROM ${ref("int_sales_enriched")}
WHERE amount < 0
OR amount > 1000000
Custom Assertions
-- definitions/tests/assert_referential_integrity.sqlx
config {
type: "assertion",
schema: "dataform_assertions",
description: "Assert referential integrity between sales and customers"
}
WITH sales_customers AS (
SELECT DISTINCT customer_id
FROM ${ref("int_sales_enriched")}
),
customers AS (
SELECT DISTINCT customer_id
FROM ${ref("stg_customers")}
)
SELECT sc.customer_id
FROM sales_customers sc
LEFT JOIN customers c ON sc.customer_id = c.customer_id
WHERE c.customer_id IS NULL
Operations
Operations are custom SQL scripts for non-SELECT operations.
Data Operations
-- definitions/operations/cleanup_old_data.sqlx
config {
type: "operations",
schema: "operations",
description: "Clean up old data from staging tables",
tags: ["cleanup", "maintenance"]
}
-- Delete data older than 7 years
DELETE FROM ${ref("raw_sales")}
WHERE sale_date < DATE_SUB(CURRENT_DATE(), INTERVAL 2555 DAY);
-- Vacuum old partitions
ALTER TABLE ${ref("raw_sales")}
DROP PARTITION
WHERE sale_date < DATE_SUB(CURRENT_DATE(), INTERVAL 2555 DAY);
Schema Operations
-- definitions/operations/create_tables.sqlx
config {
type: "operations",
schema: "operations",
description: "Create tables with partitioning and clustering"
}
-- Create partitioned and clustered table
CREATE TABLE IF NOT EXISTS ${ref("sales_partitioned")}
PARTITION BY sale_date
CLUSTER BY region, category
AS
SELECT * FROM ${ref("int_sales_enriched")}
WHERE FALSE;
Incremental Models
Incremental models process only new or changed data.
Basic Incremental Model
-- definitions/marts/sales_incremental.sqlx
config {
type: "incremental",
schema: "marts",
description: "Incremental sales processing",
uniqueKey: ["sale_id"],
incrementalStrategy: "merge"
}
SELECT
sale_id,
product_id,
customer_id,
quantity,
amount,
sale_date,
region,
CURRENT_TIMESTAMP() as updated_at
FROM ${ref("int_sales_enriched")}
${if(this.config.is_incremental, `
WHERE sale_date > (SELECT MAX(sale_date) FROM ${this})
`, "")}
Incremental Strategies
# Incremental strategies in Dataform
strategies = {
'merge': 'Insert new rows and update existing rows',
'append': 'Insert new rows only',
'delete+insert': 'Delete existing rows and insert new ones',
}
# Example: Merge strategy
config = {
'type': 'incremental',
'uniqueKey': ['id', 'date'],
'incrementalStrategy': 'merge',
'updateHint': 'updated_at',
}
JavaScript Templating
Use JavaScript for dynamic SQL generation.
JavaScript Functions
-- definitions/marts/dynamic_sales.sqlx
config {
type: "table",
schema: "marts",
description: "Dynamic sales table with configurable filters"
}
WITH filtered_sales AS (
SELECT *
FROM ${ref("int_sales_enriched")}
WHERE 1=1
${if(
dataform.projectConfig.vars.environment === "production",
"AND sale_date >= DATE_SUB(CURRENT_DATE(), INTERVAL 30 DAY)",
"AND sale_date >= DATE_SUB(CURRENT_DATE(), INTERVAL 7 DAY)"
)}
)
SELECT
sale_date,
region,
SUM(amount) as total_amount,
COUNT(*) as transaction_count
FROM filtered_sales
GROUP BY 1, 2
JavaScript Includes
// includes/functions.js
module.exports = {
formatDate: function(dateColumn, format) {
switch(format) {
case 'YYYY-MM-DD':
return `FORMAT_DATE('%Y-%m-%d', ${dateColumn})`;
case 'YYYYMMDD':
return `FORMAT_DATE('%Y%m%d', ${dateColumn})`;
default:
return dateColumn;
}
},
calculateAge: function(birthDate, referenceDate) {
return `DATE_DIFF(${referenceDate}, ${birthDate}, YEAR)`;
},
safeDivide: function(numerator, denominator) {
return `SAFE_DIVIDE(${numerator}, ${denominator})`;
}
};
Scheduling
Schedule Dataform runs using Cloud Composer or Cloud Scheduler.
Cloud Composer Integration
# Airflow DAG for Dataform scheduling
from airflow import DAG
from airflow.providers.google.cloud.operators.dataform import (
DataformCreateCompilationResultOperator,
DataformCreateWorkflowInvocationOperator,
)
from datetime import datetime, timedelta
default_args = {
'owner': 'data-engineering',
'depends_on_past': False,
'email_on_failure': True,
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'dataform_daily_pipeline',
default_args=default_args,
description='Daily Dataform execution',
schedule_interval='0 6 * * *',
start_date=datetime(2024, 1, 1),
catchup=False,
)
# Compile Dataform project
compile_task = DataformCreateCompilationResultOperator(
task_id='compile_dataform',
project_id='my-project',
location='us-central1',
repository_id='my-dataform-repo',
compilation_result={
'git_commitish': 'main'
},
dag=dag,
)
# Invoke workflow
invoke_task = DataformCreateWorkflowInvocationOperator(
task_id='invoke_dataform',
project_id='my-project',
location='us-central1',
repository_id='my-dataform-repo',
workflow_invocation={
'compilation_result': '{{ task_instance.xcom_pull(task_ids="compile_dataform") }}',
'invocation_config': {
'included_tags': ['daily'],
'transitive_dependency_included': True
}
},
dag=dag,
)
compile_task >> invoke_task
Cloud Scheduler
# Create Cloud Scheduler job
gcloud scheduler jobs create http dataform-daily-run \
--location=us-central1 \
--schedule="0 6 * * *" \
--uri="https://dataform.googleapis.com/v1beta1/projects/my-project/locations/us-central1/repositories/my-repo/workflows/my-workflow:invoke" \
--http-method=POST \
--oidc-service-account-email=my-service-account@my-project.iam.gserviceaccount.com
Best Practices
- Use layered architecture - Sources, Staging, Intermediate, Marts
- Implement assertions - Validate data quality at each layer
- Use incremental models - Optimize processing for large datasets
- Version control - Store all code in Git repositories
- Document everything - Use descriptions and tags
- Test regularly - Run assertions on every execution
- Monitor performance - Track execution times and resource usage