BigQuery Provider Integration with Airflow
Architecture Diagram
Formal Definitions
DfBigQuery Provider
The BigQuery provider (apache-airflow-providers-google) provides operators, hooks, triggers, and sensors for Google BigQuery. It wraps the BigQuery Python client library, managing authentication via Google Cloud connections and providing Airflow-native task interfaces.
DfPartition Strategy
BigQuery partitioning divides large tables into segments. Partition strategies include: time-unit partitioning (), ingestion-time partitioning, and integer-range partitioning. Partition pruning reduces query scan from to .
Detailed Explanation
Connection Setup
# Airflow connection for BigQuery
# Connection ID: google_cloud_default
# Connection Type: Google Cloud
# Project ID: my-gcp-project
# Keyfile JSON: Path to service account key
# Extra: {"scope": "https://www.googleapis.com/auth/bigquery"}
Query Execution
from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.google.cloud.operators.bigquery import (
BigQueryInsertJobOperator,
)
with DAG(
dag_id='bigquery_etl',
start_date=datetime(2024, 1, 1),
schedule_interval='@daily',
catchup=False,
tags=['bigquery', 'etl'],
) as dag:
create_table = BigQueryInsertJobOperator(
task_id='create_partitioned_table',
configuration={
'query': {
'query': """
CREATE TABLE IF NOT EXISTS `project.dataset.orders`
(
order_id INT64,
customer_id INT64,
amount NUMERIC,
order_date DATE
)
PARTITION BY order_date
CLUSTER BY customer_id
""",
'useLegacySql': False,
}
},
location='US',
)
load_data = BigQueryInsertJobOperator(
task_id='load_from_gcs',
configuration={
'load': {
'sourceUris': ['gs://data-lake/raw/orders/*.parquet'],
'destinationTable': {
'projectId': 'project',
'datasetId': 'dataset',
'tableId': 'stg_orders',
},
'sourceFormat': 'PARQUET',
'writeDisposition': 'WRITE_TRUNCATE',
'timePartitioning': {
'type': 'DAY',
'field': 'order_date',
},
'clustering': {
'fields': ['customer_id'],
},
}
},
location='US',
)
transform = BigQueryInsertJobOperator(
task_id='transform_orders',
configuration={
'query': {
'query': """
INSERT INTO `project.dataset.fct_orders`
SELECT
order_id,
customer_id,
SUM(amount) as total_amount,
COUNT(*) as order_count,
MIN(order_date) as first_order_date
FROM `project.dataset.stg_orders`
WHERE order_date = DATE('{{ ds }}')
GROUP BY order_id, customer_id
""",
'useLegacySql': False,
}
},
location='US',
)
create_table >> load_data >> transform
GCS to BigQuery Loading
from airflow.providers.google.cloud.operators.bigquery import (
GCSToBigQueryOperator,
)
load_csv = GCSToBigQueryOperator(
task_id='load_csv_to_bq',
bucket='data-lake',
source_objects=['raw/customers/*.csv'],
destination_project_dataset_table='project.dataset.customers',
source_format='CSV',
skip_leading_rows=1,
field_delimiter=',',
write_disposition='WRITE_APPEND',
create_disposition='CREATE_IF_NEEDED',
schema_fields=[
{'name': 'customer_id', 'type': 'INTEGER', 'mode': 'REQUIRED'},
{'name': 'name', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'email', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'created_at', 'type': 'TIMESTAMP', 'mode': 'NULLABLE'},
],
location='US',
)
Dataset Management
from airflow.providers.google.cloud.operators.bigquery import (
BigQueryCreateDatasetOperator,
BigQueryDeleteDatasetOperator,
BigQueryUpdateDatasetOperator,
)
create_dataset = BigQueryCreateDatasetOperator(
task_id='create_dataset',
dataset_id='analytics',
project_id='my-project',
dataset_reference={
'description': 'Analytics dataset',
'location': 'US',
},
)
update_dataset = BigQueryUpdateDatasetOperator(
task_id='update_dataset',
dataset_id='analytics',
dataset_reference={
'description': 'Updated analytics dataset',
},
fields_mask='description',
)
delete_dataset = BigQueryDeleteDatasetOperator(
task_id='delete_staging',
dataset_id='staging',
delete_contents=True,
)
Sensors
from airflow.providers.google.cloud.sensors.bigquery import (
BigQueryTableExistenceSensor,
BigQueryInsertJobTrigger,
)
wait_for_table = BigQueryTableExistenceSensor(
task_id='wait_for_table',
project_id='my-project',
dataset_id='dataset',
table_id='orders',
poke_interval=30,
timeout=3600,
mode='reschedule',
)
Partitioned Query with Parameters
from airflow.providers.google.cloud.operators.bigquery import (
BigQueryInsertJobOperator,
)
def generate_partition_queries(start_date: str, end_date: str) -> list:
"""Generate queries for date range partitioning."""
queries = []
current = datetime.strptime(start_date, '%Y-%m-%d')
end = datetime.strptime(end_date, '%Y-%m-%d')
while current <= end:
date_str = current.strftime('%Y-%m-%d')
query = f"""
SELECT * FROM `project.dataset.orders`
WHERE order_date = '{date_str}'
AND status = 'completed'
"""
queries.append((date_str, query))
current += timedelta(days=1)
return queries
with DAG(
dag_id='partitioned_processing',
start_date=datetime(2024, 1, 1),
schedule_interval='@daily',
catchup=False,
) as dag:
queries = generate_partition_queries('{{ ds }}', '{{ ds }}')
prev_task = None
for date_str, sql in queries:
task = BigQueryInsertJobOperator(
task_id=f'process_{date_str}',
configuration={
'query': {
'query': sql,
'useLegacySql': False,
}
},
location='US',
)
if prev_task:
prev_task >> task
prev_task = task
Key Concepts Table
| Operator | Purpose | Key Parameters |
|---|---|---|
| BigQueryInsertJobOperator | Execute any BQ job | configuration, location |
| GCSToBigQueryOperator | Load GCS files to BQ | bucket, source_objects, destination_project_dataset_table |
| BigQueryCopyTableOperator | Copy between tables | source_project_dataset_table, destination_project_dataset_table |
| BigQueryCreateDatasetOperator | Create dataset | dataset_id, project_id |
| BigQueryDeleteDatasetOperator | Delete dataset | dataset_id, delete_contents |
| BigQueryTableExistenceSensor | Wait for table | project_id, dataset_id, table_id |
Partitioning Strategies
| Strategy | Partition Key | Best For | Query Pruning |
|---|---|---|---|
| DAY | DATE column | High-cardinality time series | Excellent |
| MONTH | DATE column | Medium-cardinality aggregates | Good |
| YEAR | DATE column | Low-cardinality historical | Moderate |
| HOUR | TIMESTAMP column | Real-time analytics | Excellent |
| INGESTION_TIME | Load timestamp | When source lacks dates | Moderate |
| INTEGER_RANGE | INT64 column | Non-temporal ranges | Good |
Best Practices
- Always specify
locationto match your dataset's geographic location. - Use partitioned tables for time-series data to reduce query costs and improve performance.
- Use
WRITE_TRUNCATEfor full table refreshes,WRITE_APPENDfor incremental loads. - Leverage clustering on frequently filtered columns for additional query optimization.
- Set
priorityon jobs:INTERACTIVEfor ad-hoc,BATCHfor scheduled ETL. - Handle
schema_fieldsexplicitly for CSV/JSON loads to avoid schema detection overhead. - Use
create_disposition='CREATE_IF_NEEDED'for idempotent table creation. - Monitor BQ job costs through
configuration.dryRunand GCP billing exports.
Partition pruning reduces the amount of data scanned. Always include the partition column in WHERE clauses. For example: WHERE order_date = DATE('{{ ds }}') ensures only one partition is scanned instead of the entire table.
BigQuery charges based on bytes scanned. Using partitioned and clustered tables can reduce query costs by 10-100x. Combine with materialized views for frequently-run reports.
Key Takeaways:
- BigQueryInsertJobOperator is the primary operator for all BQ operations (queries, loads, copies)
- GCSToBigQueryOperator handles file loading from GCS to BigQuery
- Partitioning reduces query scan from O(N) to O(N/P) where P is partition count
- Clustering provides additional optimization for filtered columns
- Always specify location to match dataset geography
- Use sensors with
mode='reschedule'for efficient table existence checks
See Also
- Databricks Provider — Databricks cluster and job management
- Snowflake Provider — Snowflake integration patterns
- Operators and Hooks — Operator lifecycle and hook architecture
- XCom Communications — Task communication and data passing