πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Dataform for SQL-Based Data Transformation

🟒 Free Lesson

Advertisement

Dataform for SQL-Based Data Transformation

Dataform Project FlowSourcesExternal TablesStagingRaw Data ViewsIntermediateTransformed DataMartsBusiness TablesTestsData QualitySQLX FilesSQL + JavaScriptAssertionsData ValidationOperationsCustom ScriptsSchedulingAutomated Runs

Dataform Overview

Dataform is GCP's managed service for SQL-based data transformation. It's similar to dbt but fully integrated with BigQuery and GCP.

Core Concepts

SQLX Files:

  • Combine SQL with JavaScript templating
  • Define data transformations
  • Support incremental models
  • Enable dynamic SQL generation

Actions:

  • Tables: Materialized tables in BigQuery
  • Views: Virtual tables for logic separation
  • Assertions: Data quality tests
  • Operations: Custom SQL scripts

Dependencies:

  • Automatic dependency resolution
  • DAG-based execution order
  • Incremental processing support
  • Parallel execution

Project Structure

Directory Layout

πŸ—οΈ GCP Data Engineering Reference Architecture
DATA SOURCESπŸ—ƒοΈOn-Prem DB☁️SaaS APIsπŸ“‘IoT SensorsπŸ“±Mobile AppsπŸ”ŒREST APIsINGESTION LAYERDataflow (CDC)Pub/SubCloud TasksStorage TransferTransfer ApplianceRAW DATA ZONE (Cloud Storage)landing/Ingested databronze/Unvalidatedarchive/Historicalraw/Original formatstaging/Temp processingPROCESSING LAYERDataflowStream + BatchDataprocSpark/HadoopCloud FunctionsEvent-drivenData PrepVisual ETLCloud ComposerOrchestrateCURATED DATA ZONEsilver/Cleaned, validatedgold/Business-readyaggregates/Pre-computedfeatures/ML featuresBigQuery (Warehouse)Looker (BI)Vertex AI (ML)Data StudioDataplex
Interview Tip: GCP's data engineering stack is serverless-first. Dataflow (Apache Beam) handles both streaming and batch. BigQuery is the flagship analytics service.

Configuration Files

// dataform.json
{
  "warehouse": "bigquery",
  "defaultSchema": "dataform",
  "defaultDatabase": "my-project",
  "defaultLocation": "US",
  "assertionSchema": "dataform_assertions",
  "variables": {
    "environment": "production",
    "project_id": "my-project",
    "dataset": "analytics"
  }
}
// package.json
{
  "name": "my-dataform-project",
  "version": "1.0.0",
  "description": "Dataform transformation project",
  "dependencies": {
    "@dataform/core": "2.0.0",
    "@dataform/assertions": "2.0.0"
  }
}

SQLX Files

SQLX files combine SQL with JavaScript templating for dynamic transformations.

Basic SQLX Structure

-- definitions/sources/raw_sales.sqlx
config {
  type: "declaration",
  database: "my-project",
  schema: "raw",
  name: "sales",
  description: "Raw sales data from source system"
}

Staging Layer

-- definitions/staging/stg_sales.sqlx
config {
  type: "view",
  schema: "staging",
  description: "Staged sales data with standardization"
}

SELECT
  sale_id,
  TRIM(UPPER(product_id)) as product_id,
  TRIM(LOWER(customer_id)) as customer_id,
  SAFE_CAST(quantity AS INT64) as quantity,
  SAFE_CAST(amount AS FLOAT64) as amount,
  SAFE_CAST(sale_date AS DATE) as sale_date,
  TRIM(UPPER(region)) as region,
  CURRENT_TIMESTAMP() as loaded_at
FROM ${ref("sales")}
WHERE sale_id IS NOT NULL
  AND amount > 0

Intermediate Layer

-- definitions/intermediate/int_sales_enriched.sqlx
config {
  type: "table",
  schema: "intermediate",
  description: "Sales data enriched with customer and product information",
  tags: ["daily", "sales"]
}

WITH sales AS (
  SELECT * FROM ${ref("stg_sales")}
),

customers AS (
  SELECT * FROM ${ref("stg_customers")}
),

products AS (
  SELECT * FROM ${ref("stg_products")}
),

enriched_sales AS (
  SELECT
    s.sale_id,
    s.product_id,
    p.product_name,
    p.category,
    s.customer_id,
    c.customer_name,
    c.customer_segment,
    s.quantity,
    s.amount,
    s.sale_date,
    s.region,
    DATE_DIFF(CURRENT_DATE(), s.sale_date, DAY) as days_since_sale,
    s.loaded_at
  FROM sales s
  LEFT JOIN customers c ON s.customer_id = c.customer_id
  LEFT JOIN products p ON s.product_id = p.product_id
)

SELECT * FROM enriched_sales

Marts Layer

-- definitions/marts/sales_daily_summary.sqlx
config {
  type: "table",
  schema: "marts",
  description: "Daily sales summary for reporting",
  tags: ["daily", "reporting"],
  dependencies: ["intermediate.int_sales_enriched"]
}

WITH daily_sales AS (
  SELECT
    sale_date,
    region,
    category,
    customer_segment,
    COUNT(DISTINCT sale_id) as transaction_count,
    COUNT(DISTINCT customer_id) as unique_customers,
    SUM(quantity) as total_quantity,
    SUM(amount) as total_amount,
    AVG(amount) as avg_order_value,
    MIN(amount) as min_order_value,
    MAX(amount) as max_order_value
  FROM ${ref("int_sales_enriched")}
  WHERE sale_date >= DATE_SUB(CURRENT_DATE(), INTERVAL 365 DAY)
  GROUP BY 1, 2, 3, 4
)

SELECT
  *,
  SAFE_DIVIDE(total_amount, unique_customers) as revenue_per_customer,
  SAFE_DIVIDE(total_amount, transaction_count) as revenue_per_transaction
FROM daily_sales

Assertions

Assertions validate data quality and business rules.

Basic Assertions

-- definitions/tests/assert_unique_sale_id.sqlx
config {
  type: "assertion",
  schema: "dataform_assertions",
  description: "Assert that sale_id is unique"
}

SELECT sale_id
FROM ${ref("int_sales_enriched")}
GROUP BY sale_id
HAVING COUNT(*) > 1

Not Null Assertions

-- definitions/tests/assert_not_null_amount.sqlx
config {
  type: "assertion",
  schema: "dataform_assertions",
  description: "Assert that amount is not null"
}

SELECT amount
FROM ${ref("int_sales_enriched")}
WHERE amount IS NULL

Range Assertions

-- definitions/tests/assert_valid_amounts.sqlx
config {
  type: "assertion",
  schema: "dataform_assertions",
  description: "Assert that amounts are within valid range"
}

SELECT amount
FROM ${ref("int_sales_enriched")}
WHERE amount < 0
   OR amount > 1000000

Custom Assertions

-- definitions/tests/assert_referential_integrity.sqlx
config {
  type: "assertion",
  schema: "dataform_assertions",
  description: "Assert referential integrity between sales and customers"
}

WITH sales_customers AS (
  SELECT DISTINCT customer_id
  FROM ${ref("int_sales_enriched")}
),

customers AS (
  SELECT DISTINCT customer_id
  FROM ${ref("stg_customers")}
)

SELECT sc.customer_id
FROM sales_customers sc
LEFT JOIN customers c ON sc.customer_id = c.customer_id
WHERE c.customer_id IS NULL

Operations

Operations are custom SQL scripts for non-SELECT operations.

Data Operations

-- definitions/operations/cleanup_old_data.sqlx
config {
  type: "operations",
  schema: "operations",
  description: "Clean up old data from staging tables",
  tags: ["cleanup", "maintenance"]
}

-- Delete data older than 7 years
DELETE FROM ${ref("raw_sales")}
WHERE sale_date < DATE_SUB(CURRENT_DATE(), INTERVAL 2555 DAY);

-- Vacuum old partitions
ALTER TABLE ${ref("raw_sales")}
DROP PARTITION
WHERE sale_date < DATE_SUB(CURRENT_DATE(), INTERVAL 2555 DAY);

Schema Operations

-- definitions/operations/create_tables.sqlx
config {
  type: "operations",
  schema: "operations",
  description: "Create tables with partitioning and clustering"
}

-- Create partitioned and clustered table
CREATE TABLE IF NOT EXISTS ${ref("sales_partitioned")}
PARTITION BY sale_date
CLUSTER BY region, category
AS
SELECT * FROM ${ref("int_sales_enriched")}
WHERE FALSE;

Incremental Models

Incremental models process only new or changed data.

Basic Incremental Model

-- definitions/marts/sales_incremental.sqlx
config {
  type: "incremental",
  schema: "marts",
  description: "Incremental sales processing",
  uniqueKey: ["sale_id"],
  incrementalStrategy: "merge"
}

SELECT
  sale_id,
  product_id,
  customer_id,
  quantity,
  amount,
  sale_date,
  region,
  CURRENT_TIMESTAMP() as updated_at
FROM ${ref("int_sales_enriched")}

${if(this.config.is_incremental, `
WHERE sale_date > (SELECT MAX(sale_date) FROM ${this})
`, "")}

Incremental Strategies

# Incremental strategies in Dataform
strategies = {
    'merge': 'Insert new rows and update existing rows',
    'append': 'Insert new rows only',
    'delete+insert': 'Delete existing rows and insert new ones',
}

# Example: Merge strategy
config = {
    'type': 'incremental',
    'uniqueKey': ['id', 'date'],
    'incrementalStrategy': 'merge',
    'updateHint': 'updated_at',
}

JavaScript Templating

Use JavaScript for dynamic SQL generation.

JavaScript Functions

-- definitions/marts/dynamic_sales.sqlx
config {
  type: "table",
  schema: "marts",
  description: "Dynamic sales table with configurable filters"
}

WITH filtered_sales AS (
  SELECT *
  FROM ${ref("int_sales_enriched")}
  WHERE 1=1
    ${if(
      dataform.projectConfig.vars.environment === "production",
      "AND sale_date >= DATE_SUB(CURRENT_DATE(), INTERVAL 30 DAY)",
      "AND sale_date >= DATE_SUB(CURRENT_DATE(), INTERVAL 7 DAY)"
    )}
)

SELECT
  sale_date,
  region,
  SUM(amount) as total_amount,
  COUNT(*) as transaction_count
FROM filtered_sales
GROUP BY 1, 2

JavaScript Includes

// includes/functions.js
module.exports = {
  formatDate: function(dateColumn, format) {
    switch(format) {
      case 'YYYY-MM-DD':
        return `FORMAT_DATE('%Y-%m-%d', ${dateColumn})`;
      case 'YYYYMMDD':
        return `FORMAT_DATE('%Y%m%d', ${dateColumn})`;
      default:
        return dateColumn;
    }
  },
  
  calculateAge: function(birthDate, referenceDate) {
    return `DATE_DIFF(${referenceDate}, ${birthDate}, YEAR)`;
  },
  
  safeDivide: function(numerator, denominator) {
    return `SAFE_DIVIDE(${numerator}, ${denominator})`;
  }
};

Scheduling

Schedule Dataform runs using Cloud Composer or Cloud Scheduler.

Cloud Composer Integration

# Airflow DAG for Dataform scheduling
from airflow import DAG
from airflow.providers.google.cloud.operators.dataform import (
    DataformCreateCompilationResultOperator,
    DataformCreateWorkflowInvocationOperator,
)
from datetime import datetime, timedelta

default_args = {
    'owner': 'data-engineering',
    'depends_on_past': False,
    'email_on_failure': True,
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'dataform_daily_pipeline',
    default_args=default_args,
    description='Daily Dataform execution',
    schedule_interval='0 6 * * *',
    start_date=datetime(2024, 1, 1),
    catchup=False,
)

# Compile Dataform project
compile_task = DataformCreateCompilationResultOperator(
    task_id='compile_dataform',
    project_id='my-project',
    location='us-central1',
    repository_id='my-dataform-repo',
    compilation_result={
        'git_commitish': 'main'
    },
    dag=dag,
)

# Invoke workflow
invoke_task = DataformCreateWorkflowInvocationOperator(
    task_id='invoke_dataform',
    project_id='my-project',
    location='us-central1',
    repository_id='my-dataform-repo',
    workflow_invocation={
        'compilation_result': '{{ task_instance.xcom_pull(task_ids="compile_dataform") }}',
        'invocation_config': {
            'included_tags': ['daily'],
            'transitive_dependency_included': True
        }
    },
    dag=dag,
)

compile_task >> invoke_task

Cloud Scheduler

# Create Cloud Scheduler job
gcloud scheduler jobs create http dataform-daily-run \
    --location=us-central1 \
    --schedule="0 6 * * *" \
    --uri="https://dataform.googleapis.com/v1beta1/projects/my-project/locations/us-central1/repositories/my-repo/workflows/my-workflow:invoke" \
    --http-method=POST \
    --oidc-service-account-email=my-service-account@my-project.iam.gserviceaccount.com

Best Practices

  1. Use layered architecture - Sources, Staging, Intermediate, Marts
  2. Implement assertions - Validate data quality at each layer
  3. Use incremental models - Optimize processing for large datasets
  4. Version control - Store all code in Git repositories
  5. Document everything - Use descriptions and tags
  6. Test regularly - Run assertions on every execution
  7. Monitor performance - Track execution times and resource usage
⭐

Premium Content

Dataform for SQL-Based Data Transformation

Unlock this lesson and 900+ advanced tutorials with a Premium plan.

🎯End-to-end Projects
πŸ’ΌInterview Prep
πŸ“œCertificates
🀝Community Access

Already a member? Log in

Need Expert GCP Data Engineering Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement