CW

BigQuery Provider Integration with Airflow

Free Lesson

Advertisement

BigQuery Provider Integration with Airflow

Architecture Diagram

Formal Definitions

DfBigQuery Provider

The BigQuery provider (apache-airflow-providers-google) provides operators, hooks, triggers, and sensors for Google BigQuery. It wraps the BigQuery Python client library, managing authentication via Google Cloud connections and providing Airflow-native task interfaces.

DfPartition Strategy

BigQuery partitioning divides large tables into segments. Partition strategies include: time-unit partitioning (P=PDAYPMONTHPYEARP = P_{\text{DAY}} | P_{\text{MONTH}} | P_{\text{YEAR}}), ingestion-time partitioning, and integer-range partitioning. Partition pruning reduces query scan from O(N)O(N) to O(N/Ppartitions)O(N/P_{\text{partitions}}).

Detailed Explanation

Connection Setup

# Airflow connection for BigQuery
# Connection ID: google_cloud_default
# Connection Type: Google Cloud
# Project ID: my-gcp-project
# Keyfile JSON: Path to service account key
# Extra: {"scope": "https://www.googleapis.com/auth/bigquery"}

Query Execution

from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.google.cloud.operators.bigquery import (
    BigQueryInsertJobOperator,
)

with DAG(
    dag_id='bigquery_etl',
    start_date=datetime(2024, 1, 1),
    schedule_interval='@daily',
    catchup=False,
    tags=['bigquery', 'etl'],
) as dag:

    create_table = BigQueryInsertJobOperator(
        task_id='create_partitioned_table',
        configuration={
            'query': {
                'query': """
                    CREATE TABLE IF NOT EXISTS `project.dataset.orders`
                    (
                        order_id INT64,
                        customer_id INT64,
                        amount NUMERIC,
                        order_date DATE
                    )
                    PARTITION BY order_date
                    CLUSTER BY customer_id
                """,
                'useLegacySql': False,
            }
        },
        location='US',
    )

    load_data = BigQueryInsertJobOperator(
        task_id='load_from_gcs',
        configuration={
            'load': {
                'sourceUris': ['gs://data-lake/raw/orders/*.parquet'],
                'destinationTable': {
                    'projectId': 'project',
                    'datasetId': 'dataset',
                    'tableId': 'stg_orders',
                },
                'sourceFormat': 'PARQUET',
                'writeDisposition': 'WRITE_TRUNCATE',
                'timePartitioning': {
                    'type': 'DAY',
                    'field': 'order_date',
                },
                'clustering': {
                    'fields': ['customer_id'],
                },
            }
        },
        location='US',
    )

    transform = BigQueryInsertJobOperator(
        task_id='transform_orders',
        configuration={
            'query': {
                'query': """
                    INSERT INTO `project.dataset.fct_orders`
                    SELECT
                        order_id,
                        customer_id,
                        SUM(amount) as total_amount,
                        COUNT(*) as order_count,
                        MIN(order_date) as first_order_date
                    FROM `project.dataset.stg_orders`
                    WHERE order_date = DATE('{{ ds }}')
                    GROUP BY order_id, customer_id
                """,
                'useLegacySql': False,
            }
        },
        location='US',
    )

    create_table >> load_data >> transform

GCS to BigQuery Loading

from airflow.providers.google.cloud.operators.bigquery import (
    GCSToBigQueryOperator,
)

load_csv = GCSToBigQueryOperator(
    task_id='load_csv_to_bq',
    bucket='data-lake',
    source_objects=['raw/customers/*.csv'],
    destination_project_dataset_table='project.dataset.customers',
    source_format='CSV',
    skip_leading_rows=1,
    field_delimiter=',',
    write_disposition='WRITE_APPEND',
    create_disposition='CREATE_IF_NEEDED',
    schema_fields=[
        {'name': 'customer_id', 'type': 'INTEGER', 'mode': 'REQUIRED'},
        {'name': 'name', 'type': 'STRING', 'mode': 'NULLABLE'},
        {'name': 'email', 'type': 'STRING', 'mode': 'NULLABLE'},
        {'name': 'created_at', 'type': 'TIMESTAMP', 'mode': 'NULLABLE'},
    ],
    location='US',
)

Dataset Management

from airflow.providers.google.cloud.operators.bigquery import (
    BigQueryCreateDatasetOperator,
    BigQueryDeleteDatasetOperator,
    BigQueryUpdateDatasetOperator,
)

create_dataset = BigQueryCreateDatasetOperator(
    task_id='create_dataset',
    dataset_id='analytics',
    project_id='my-project',
    dataset_reference={
        'description': 'Analytics dataset',
        'location': 'US',
    },
)

update_dataset = BigQueryUpdateDatasetOperator(
    task_id='update_dataset',
    dataset_id='analytics',
    dataset_reference={
        'description': 'Updated analytics dataset',
    },
    fields_mask='description',
)

delete_dataset = BigQueryDeleteDatasetOperator(
    task_id='delete_staging',
    dataset_id='staging',
    delete_contents=True,
)

Sensors

from airflow.providers.google.cloud.sensors.bigquery import (
    BigQueryTableExistenceSensor,
    BigQueryInsertJobTrigger,
)

wait_for_table = BigQueryTableExistenceSensor(
    task_id='wait_for_table',
    project_id='my-project',
    dataset_id='dataset',
    table_id='orders',
    poke_interval=30,
    timeout=3600,
    mode='reschedule',
)

Partitioned Query with Parameters

from airflow.providers.google.cloud.operators.bigquery import (
    BigQueryInsertJobOperator,
)

def generate_partition_queries(start_date: str, end_date: str) -> list:
    """Generate queries for date range partitioning."""
    queries = []
    current = datetime.strptime(start_date, '%Y-%m-%d')
    end = datetime.strptime(end_date, '%Y-%m-%d')

    while current <= end:
        date_str = current.strftime('%Y-%m-%d')
        query = f"""
            SELECT * FROM `project.dataset.orders`
            WHERE order_date = '{date_str}'
            AND status = 'completed'
        """
        queries.append((date_str, query))
        current += timedelta(days=1)

    return queries


with DAG(
    dag_id='partitioned_processing',
    start_date=datetime(2024, 1, 1),
    schedule_interval='@daily',
    catchup=False,
) as dag:

    queries = generate_partition_queries('{{ ds }}', '{{ ds }}')
    prev_task = None

    for date_str, sql in queries:
        task = BigQueryInsertJobOperator(
            task_id=f'process_{date_str}',
            configuration={
                'query': {
                    'query': sql,
                    'useLegacySql': False,
                }
            },
            location='US',
        )
        if prev_task:
            prev_task >> task
        prev_task = task

Key Concepts Table

OperatorPurposeKey Parameters
BigQueryInsertJobOperatorExecute any BQ jobconfiguration, location
GCSToBigQueryOperatorLoad GCS files to BQbucket, source_objects, destination_project_dataset_table
BigQueryCopyTableOperatorCopy between tablessource_project_dataset_table, destination_project_dataset_table
BigQueryCreateDatasetOperatorCreate datasetdataset_id, project_id
BigQueryDeleteDatasetOperatorDelete datasetdataset_id, delete_contents
BigQueryTableExistenceSensorWait for tableproject_id, dataset_id, table_id

Partitioning Strategies

StrategyPartition KeyBest ForQuery Pruning
DAYDATE columnHigh-cardinality time seriesExcellent
MONTHDATE columnMedium-cardinality aggregatesGood
YEARDATE columnLow-cardinality historicalModerate
HOURTIMESTAMP columnReal-time analyticsExcellent
INGESTION_TIMELoad timestampWhen source lacks datesModerate
INTEGER_RANGEINT64 columnNon-temporal rangesGood

Best Practices

  1. Always specify location to match your dataset's geographic location.
  2. Use partitioned tables for time-series data to reduce query costs and improve performance.
  3. Use WRITE_TRUNCATE for full table refreshes, WRITE_APPEND for incremental loads.
  4. Leverage clustering on frequently filtered columns for additional query optimization.
  5. Set priority on jobs: INTERACTIVE for ad-hoc, BATCH for scheduled ETL.
  6. Handle schema_fields explicitly for CSV/JSON loads to avoid schema detection overhead.
  7. Use create_disposition='CREATE_IF_NEEDED' for idempotent table creation.
  8. Monitor BQ job costs through configuration.dryRun and GCP billing exports.

Partition pruning reduces the amount of data scanned. Always include the partition column in WHERE clauses. For example: WHERE order_date = DATE('{{ ds }}') ensures only one partition is scanned instead of the entire table.

BigQuery charges based on bytes scanned. Using partitioned and clustered tables can reduce query costs by 10-100x. Combine with materialized views for frequently-run reports.

Key Takeaways:

  • BigQueryInsertJobOperator is the primary operator for all BQ operations (queries, loads, copies)
  • GCSToBigQueryOperator handles file loading from GCS to BigQuery
  • Partitioning reduces query scan from O(N) to O(N/P) where P is partition count
  • Clustering provides additional optimization for filtered columns
  • Always specify location to match dataset geography
  • Use sensors with mode='reschedule' for efficient table existence checks

See Also

Advertisement

Need Expert Airflow Help?

Get personalized DAG design, scheduling optimization, or production Airflow consulting.

Advertisement