Dataflow Architecture
Google Cloud Dataflow is a fully managed service for executing Apache Beam pipelines. It provides unified batch and streaming processing with automatic resource management.
Architecture Overview
Apache Beam Programming Model
Beam provides a unified programming model for batch and streaming. Key concepts:
Core Concepts
Batch Pipeline Example
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
def parse_json(element):
"""Parse JSON element."""
import json
return json.loads(element)
def format_for_bigquery(element):
"""Format element for BigQuery insertion."""
return {
'event_id': element['event_id'],
'event_type': element['event_type'],
'user_id': element['user_id'],
'timestamp': element['timestamp'],
'amount': float(element.get('amount', 0))
}
def run_batch_pipeline():
"""Run batch ETL pipeline from GCS to BigQuery."""
pipeline_options = PipelineOptions([
'--project', 'my-project',
'--runner', 'DataflowRunner',
'--region', 'us-central1',
'--temp_location', 'gs://my-bucket/temp/',
'--staging_location', 'gs://my-bucket/staging/',
'--machine_type', 'n1-standard-4',
'--max_num_workers', '10',
'--autoscaling_algorithm', 'THROUGHPUT_BASED'
])
with beam.Pipeline(options=pipeline_options) as pipeline:
# Read from GCS
raw_data = (
pipeline
| 'Read from GCS' >> beam.io.ReadFromText(
'gs://my-data-lake/raw/events/*.json'
)
)
# Parse and transform
parsed_data = (
raw_data
| 'Parse JSON' >> beam.Map(parse_json)
| 'Format for BQ' >> beam.Map(format_for_bigquery)
)
# Write to BigQuery
(
parsed_data
| 'Write to BigQuery' >> beam.io.WriteToBigQuery(
'my-project:analytics.events',
schema='event_id:STRING,event_type:STRING,user_id:STRING,timestamp:TIMESTAMP,amount:FLOAT64',
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
)
)
Streaming Pipeline Example
import apache_beam as beam
from apache_beam.transforms.window import FixedWindows
from apache_beam.transforms.trigger import (
AfterWatermark, AfterProcessingTime, AccumulationMode
)
from apache_beam.io.gcp.bigquery import WriteToBigQuery
import json
def run_streaming_pipeline():
"""Run streaming pipeline from Pub/Sub to BigQuery."""
pipeline_options = PipelineOptions([
'--project', 'my-project',
'--runner', 'DataflowRunner',
'--region', 'us-central1',
'--temp_location', 'gs://my-bucket/temp/',
'--streaming',
'--enable_streaming_engine',
'--num_workers', '4',
'--max_num_workers', '20',
'--autoscaling_algorithm', 'THROUGHPUT_BASED'
])
with beam.Pipeline(options=pipeline_options) as pipeline:
# Read from Pub/Sub
messages = (
pipeline
| 'Read Pub/Sub' >> beam.io.ReadFromPubSub(
topic='projects/my-project/topics/events'
)
| 'Decode' >> beam.Map(lambda x: json.loads(x.decode('utf-8')))
)
# Apply windowing and aggregation
windowed_counts = (
messages
| 'Window into 1 min' >> beam.WindowInto(
FixedWindows(60),
trigger=AfterWatermark(
early=AfterProcessingTime(30)
),
accumulation_mode=AccumulationMode.DISCARDING
)
| 'Extract event type' >> beam.Map(
lambda x: (x['event_type'], 1)
)
| 'Count per window' >> beam.CombinePerKey(sum)
)
# Write to BigQuery
(
windowed_counts
| 'Format' >> beam.Map(lambda x: {
'event_type': x[0],
'count': x[1],
'window_start': str(beam.window.TimestampedValue(
x[1], x[0]
))
})
| 'Write to BQ' >> WriteToBigQuery(
'my-project:analytics.windowed_counts',
schema='event_type:STRING,count:INT64,window_start:TIMESTAMP',
write_disposition=WriteToBigQuery.WRITE_APPEND,
create_disposition=WriteToBigQuery.CREATE_IF_NEEDED
)
)
β¨
Best Practice: For streaming pipelines, enable Streaming Engine to separate state management from worker processing. This reduces costs by 30-50% and improves autoscaling. Use --enable_streaming_engine flag in your pipeline options.
FlexRS (Flexible Resource Scheduling)
FlexRS provides cost optimization for batch workloads by allowing longer execution times in exchange for lower costs.
# FlexRS pipeline configuration
pipeline_options = PipelineOptions([
'--project', 'my-project',
'--runner', 'DataflowRunner',
'--region', 'us-central1',
'--temp_location', 'gs://my-bucket/temp/',
'--staging_location', 'gs://my-bucket/staging/',
'--flex_resource_scheduling_goal', 'COST_OPTIMIZED',
'--number_of_worker_harness_threads', '4',
'--experiments', 'use_runner_v2'
])
Dataflow Templates
Templates are pre-built pipelines that can be executed without writing code.
Pub/Sub to BigQuery Template
# Use Google's pre-built template
gcloud dataflow jobs run pubsub-to-bigquery \
--gcs-location=gs://dataflow-templates/latest/PubSub_to_BigQuery \
--parameters= \
inputTopic=projects/my-project/topics/events,\
outputTableSpec=my-project:analytics.events,\
outputDeadletterTable=my-project:analytics.deadletter
Custom Template
# Create custom Dataflow template
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
class CustomOptions(PipelineOptions):
"""Custom pipeline options for template."""
def __init__(self, argv=None):
super().__init__(argv=argv)
self.add_argument(
'--input_path',
help='Input GCS path',
default='gs://my-bucket/input/'
)
self.add_argument(
'--output_dataset',
help='BigQuery dataset',
default='analytics'
)
def run():
"""Run pipeline with custom options."""
pipeline_options = PipelineOptions()
custom_options = pipeline_options.view_as(CustomOptions)
standard_options = pipeline_options.view_as(StandardOptions)
with beam.Pipeline(options=pipeline_options) as pipeline:
(
pipeline
| 'Read' >> beam.io.ReadFromText(custom_options.input_path)
| 'Process' >> beam.FlatMap(process_element)
| 'Write' >> beam.io.WriteToBigQuery(
f"my-project:{custom_options.output_dataset}.output",
schema='auto',
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
)
)
# Publish template
gcloud dataflow flex-templates run my-template \
--image=gcr.io/my-project/custom-dataflow:latest \
--sdk-container-image=gcr.io/my-project/custom-dataflow-sdk:latest \
--parameters=input_path=gs://my-bucket/input/,output_dataset=analytics
Windowing and Triggers for Streaming
from apache_beam.transforms.window import (
FixedWindows, SlidingWindows, Sessions, TimestampedValue
)
from apache_beam.transforms.trigger import (
AfterWatermark, AfterProcessingTime, AfterCount,
AccumulationMode, Repeatedly, AfterAny
)
# Fixed Windows (tumbling)
FixedWindows(60) # 1-minute fixed windows
# Sliding Windows (overlapping)
SlidingWindows(60, 10) # 1-minute windows sliding every 10 seconds
# Session Windows (activity-based)
Sessions(gap_size=300) # 5-minute gap between events
# Triggers
# Fire when watermark passes window end
AfterWatermark(
early=AfterProcessingTime(30), # Early results every 30 sec
late=AfterCount(1) # Late data trigger
)
# Composite trigger
Repeatedly(
AfterAny(
AfterWatermark(),
AfterProcessingTime(60)
)
)
Autoscaling and Performance
βΉοΈ
Cost Tip: Dataflow charges per vCPU-hour, GB-hour, and streaming Shuffle/GB. For cost optimization: 1) Use preemptible VMs for batch (up to 75% savings), 2) Right-size machine types, 3) Enable Streaming Engine for streaming workloads, 4) Use FlexRS for non-urgent batch jobs.
Common Interview Questions
Q1: What is the difference between Dataflow and Dataproc?
Answer: Dataflow is serverless and managed β no cluster management, automatic autoscaling. Dataproc provides managed Spark/Hadoop clusters with more control. Use Dataflow for new development (unified batch/streaming), Dataproc for existing Spark/Hadoop workloads or when you need cluster-level control.
Q2: Explain the Apache Beam programming model.
Answer: Beam uses PCollections (immutable data collections) and Transforms (operations on PCollections). Core transforms include ParDo (map), GroupByKey (reduce), Combine (aggregate), and Window (time-based). Pipelines are DAGs of transforms. The same pipeline runs on both batch and streaming runners (Dataflow, Spark, Flink).
Q3: When would you use FlexRS vs. standard Dataflow?
Answer: FlexRS is for non-urgent batch workloads that can tolerate longer execution times (up to 6 hours). It provides up to 50% cost savings. Use FlexRS for daily aggregations, data warehouse refreshes, and backfill operations. Standard Dataflow is for latency-sensitive production ETL and streaming workloads.
Q4: How does Dataflow handle exactly-once processing?
Answer: Dataflow provides exactly-once processing semantics through: 1) Durable execution (state persisted to persistent storage), 2) Idempotent transforms (re-execution produces same results), 3) Deterministic transforms (same input always produces same output), 4) Checkpointing (periodic state saves).
Q5: What is Streaming Engine and when should you use it?
Answer: Streaming Engine offloads state management and shuffle operations from worker VMs to Google's managed service. It reduces costs by 30-50%, improves autoscaling, and eliminates worker VM limitations. Use Streaming Engine for all streaming pipelines β it's enabled with --enable_streaming_engine.