Real-Time Analytics on GCP with Dataflow and BigQuery
Real-Time Architecture
Real-time analytics on GCP combines Pub/Sub for event ingestion, Dataflow for stream processing, and BigQuery for analytics storage.
Architecture Components
Pub/Sub:
- Ingests events from various sources
- Provides at-least-once delivery
- Supports message ordering and filtering
- Handles backpressure automatically
Dataflow Streaming:
- Processes events in real-time
- Handles windowing and triggers
- Manages watermarks and late data
- Provides exactly-once processing
BigQuery:
- Stores processed analytics data
- Supports streaming inserts
- Enables real-time querying
- Provides materialized views
Dashboards:
- Visualizes real-time metrics
- Provides interactive analytics
- Enables alerting and notifications
Streaming Inserts
Real-Time Data Ingestion
# Streaming pipeline with Dataflow
import apache_beam as beam
from apache_beam.transforms.window import FixedWindows
from apache_beam.transforms.trigger import AfterWatermark, AfterProcessingTime
def parse_event(message):
"""Parse Pub/Sub message."""
import json
return json.loads(message.data.decode('utf-8'))
def run_streaming_pipeline():
"""Run streaming pipeline to BigQuery."""
options = PipelineOptions([
'--project', 'my-project',
'--region', 'us-central1',
'--runner', 'DataflowRunner',
'--streaming',
'--temp_location', 'gs://my-bucket/temp',
'--staging_location', 'gs://my-bucket/staging',
])
with beam.Pipeline(options=options) as pipeline:
(
pipeline
| 'Read from Pub/Sub' >> beam.io.ReadFromPubSub(
topic='projects/my-project/topics/events'
)
| 'Parse Events' >> beam.Map(parse_event)
| 'Window into 1 min' >> beam.WindowInto(FixedWindows(60))
| 'Format for BQ' >> beam.Map(format_for_bigquery)
| 'Write to BigQuery' >> beam.io.WriteToBigQuery(
'my-project:analytics.real_time_events',
schema='event_id:STRING,event_type:STRING,user_id:STRING,timestamp:TIMESTAMP',
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER
)
)
def format_for_bigquery(event):
"""Format event for BigQuery insertion."""
return {
'event_id': event.get('event_id', ''),
'event_type': event.get('event_type', ''),
'user_id': event.get('user_id', ''),
'timestamp': event.get('timestamp', ''),
}
BigQuery Streaming Inserts
# Direct streaming inserts to BigQuery
from google.cloud import bigquery
client = bigquery.Client()
def stream_to_bigquery(events):
"""Stream events directly to BigQuery."""
table_id = 'my-project.analytics.real_time_events'
errors = client.insert_rows_json(
table_id,
events,
row_ids=[None] * len(events),
)
if errors:
print(f'Streaming errors: {errors}')
else:
print(f'Successfully streamed {len(events)} events')
Windowed Aggregations
Time Window Patterns
# Fixed window aggregation
from apache_beam.transforms.window import FixedWindows, SlidingWindows, Sessions
# Fixed 5-minute windows
fixed_window = FixedWindows(300) # 5 minutes
# Sliding windows: 10-minute window, advance every 1 minute
sliding_window = SlidingWindows(600, 60)
# Session windows: 5-minute gap
session_window = Sessions(300)
# Pipeline with windowed aggregation
(
pipeline
| 'Read' >> beam.io.ReadFromPubSub(topic='topic')
| 'Window' >> beam.WindowInto(FixedWindows(60))
| 'Aggregate by Type' >> beam.CombinePerKey(sum)
| 'Write Results' >> beam.io.WriteToBigQuery(...)
)
Aggregation Patterns
# Common aggregation patterns
aggregation_patterns = {
'count': lambda x: len(x),
'sum': lambda x: sum(x),
'average': lambda x: sum(x) / len(x) if x else 0,
'min': lambda x: min(x) if x else None,
'max': lambda x: max(x) if x else None,
'percentile': lambda x: sorted(x)[len(x) // 2] if x else 0,
}
# Real-time aggregation pipeline
(
pipeline
| 'Read Events' >> beam.io.ReadFromPubSub(topic='topic')
| 'Window' >> beam.WindowInto(FixedWindows(60))
| 'Extract Key' >> beam.Map(lambda x: (x['event_type'], x['amount']))
| 'Aggregate' >> beam.CombinePerKey(sum)
| 'Format Results' >> beam.Map(lambda x: {
'event_type': x[0],
'total_amount': x[1],
'window_end': str(x[2])
})
| 'Write to BQ' >> beam.io.WriteToBigQuery(...)
)
Materialized Views
Materialized views pre-aggregate data for faster queries.
Creating Materialized Views
-- Create materialized view for real-time aggregations
CREATE MATERIALIZED VIEW `analytics.real_time_summary`
PARTITION BY timestamp
CLUSTER BY event_type, user_id
OPTIONS (
enable_refresh = true,
refresh_interval_minutes = 1
)
AS
SELECT
event_type,
user_id,
TIMESTAMP_TRUNC(timestamp, MINUTE) as event_minute,
COUNT(*) as event_count,
SUM(amount) as total_amount,
AVG(amount) as avg_amount,
MIN(timestamp) as first_event,
MAX(timestamp) as last_event
FROM `analytics.real_time_events`
WHERE timestamp >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 7 DAY)
GROUP BY 1, 2, 3;
Materialized View Patterns
# Real-time materialized view configuration
mv_config = {
'enable_refresh': True,
'refresh_interval_minutes': 1,
'max_staleness': 'INTERVAL 5 MINUTE',
'description': 'Real-time event aggregations',
}
# Monitor materialized view refresh
def monitor_mv_refresh(project_id, dataset_id, mv_id):
"""Monitor materialized view refresh status."""
client = bigquery.Client()
query = f"""
SELECT
last_refresh_time,
refresh_watermark,
last_definite_change_time
FROM `{project_id}.{dataset_id}.INFORMATION_SCHEMA.MATERIALIZED_VIEW_OPTIONS`
WHERE table_id = '{mv_id}'
"""
result = client.query(query).result()
for row in result:
print(f'Last refresh: {row.last_refresh_time}')
print(f'Refresh watermark: {row.refresh_watermark}')
Dashboard Integration
Looker Integration
# Connect BigQuery to Looker
looker_config = {
'connection': {
'name': 'my-bigquery-connection',
'type': 'bigquery',
'host': 'bigquery.googleapis.com',
'project_id': 'my-project',
'dataset': 'analytics',
},
'explores': [
{
'name': 'real_time_events',
'label': 'Real-Time Events',
'sql_table_name': 'analytics.real_time_events',
},
{
'name': 'real_time_summary',
'label': 'Real-Time Summary',
'sql_table_name': 'analytics.real_time_summary',
},
],
}
Data Studio Connection
# Connect to Data Studio
datastudio_config = {
'data_source': {
'type': 'BigQuery',
'project_id': 'my-project',
'dataset': 'analytics',
'table': 'real_time_summary',
},
'refresh_settings': {
'refresh_interval': 'REAL_TIME',
'max_refresh_interval': 60, # seconds
},
}
Monitoring and Alerting
Real-Time Monitoring
# Monitor streaming pipeline
from google.cloud import monitoring_v3
import time
def monitor_streaming_pipeline(project_id, job_id):
"""Monitor Dataflow streaming job."""
client = monitoring_v3.MetricServiceClient()
project_name = f"projects/{project_id}"
# Monitor element count
interval = monitoring_v3.TimeInterval()
interval.end_time = time.time()
interval.start_time = time.time() - 3600
results = client.list_time_series(
request={
'name': project_name,
'filter': f'resource.labels.job_id = "{job_id}" AND metric.type = "dataflow.googleapis.com/job/element_count"',
'interval': interval,
'view': monitoring_v3.ListTimeSeriesRequest.TimeSeriesView.FULL
}
)
for result in results:
print(f'Elements processed: {result.points[0].value.int64_value}')
Alerting Policies
# Create alert for streaming delays
from google.cloud import monitoring_v3
client = monitoring_v3.AlertPolicyServiceClient()
project_name = f"projects/my-project"
alert_policy = monitoring_v3.AlertPolicy(
display_name="Streaming Delay Alert",
conditions=[
monitoring_v3.AlertPolicy.Condition(
display_name="High watermark lag",
condition_threshold=monitoring_v3.AlertPolicy.Condition.MetricThreshold(
filter='metric.type="dataflow.googleapis.com/job/element_count"',
comparison=monitoring_v3.ComparisonType.COMPARISON_GT,
threshold_value=1000000,
duration={"seconds": 300},
aggregations=[
monitoring_v3.Aggregation(
alignment_period={"seconds": 60},
per_series_aligner=monitoring_v3.Aggregation.Aligner.ALIGN_MEAN
)
]
)
)
],
notification_channels=[],
alert_strategy=monitoring_v3.AlertPolicy.AlertStrategy(
auto_close={"seconds": 1800}
)
)
response = client.create_alert_policy(
request={"name": project_name, "alert_policy": alert_policy}
)
Performance Optimization
Optimization Strategies
# Performance optimization techniques
optimization_strategies = {
'window_size': 'Adjust window size based on latency requirements',
'batch_size': 'Optimize batch size for streaming inserts',
'compression': 'Enable compression for data transfer',
'partitioning': 'Partition BigQuery tables by time',
'clustering': 'Cluster by frequently filtered columns',
'materialized_views': 'Pre-aggregate common queries',
'auto_scaling': 'Enable autoscaling for Dataflow jobs',
}
# Optimize streaming pipeline
optimized_options = [
'--experiments=enable_streaming_engine',
'--worker_machine_type=n1-standard-4',
'--max_num_workers=20',
'--autoscaling_algorithm=THROUGHPUT_BASED',
'--enable_streaming_engine',
]
β οΈ Cost Alert
Always monitor your BigQuery costs using INFORMATION_SCHEMA. Set up budget alerts at 50%, 80%, and 100% thresholds.
Cost Optimization
# Cost optimization for real-time analytics
cost_optimization = {
'use_streaming_engine': 'Offload shuffle to streaming engine',
'optimize_window_size': 'Balance latency vs cost',
'use_materialized_views': 'Reduce query costs',
'monitor_slot_usage': 'Optimize BigQuery slot allocation',
'implement_auto_scaling': 'Scale resources based on load',
}
# Monitor costs
def monitor_streaming_costs(project_id):
"""Monitor streaming pipeline costs."""
from google.cloud import billing_v1
client = billing_v1.CloudBillingClient()
billing_account = f'projects/{project_id}/billingInfo'
# Query cost data
# This is a simplified example
print(f'Monitoring costs for project: {project_id}')
Best Practices
- Use appropriate window sizes - Balance latency with accuracy
- Implement watermarks - Handle late arriving data properly
- Monitor pipeline health - Track throughput and latency
- Use materialized views - Pre-aggregate for faster queries
- Implement error handling - Use dead-letter patterns
- Optimize costs - Use streaming engine and autoscaling
- Test thoroughly - Validate with historical data