πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Dataflow: Apache Beam, Streaming & Batch Processing

GCP Data EngineeringDataflow⭐ Premium

Advertisement

Google Cloud Dataflow Deep Dive

Master Google Cloud Dataflow including Apache Beam programming model, streaming, batch, FlexRS, templates, and enterprise pipeline patterns.

25 min readAdvanced

Dataflow Architecture

Google Cloud Dataflow is a fully managed service for executing Apache Beam pipelines. It provides unified batch and streaming processing with automatic resource management.

Architecture Overview

Dataflow vs Dataproc: When to Use What
Dataflow
Apache Beam (Serverless)
βœ“ Fully managed, no cluster setup
βœ“ Auto-scaling (up and down)
βœ“ Unified stream + batch
βœ“ Exactly-once processing
βœ“ Pay per CPU/GB-second
βœ— Limited customization
βœ— Harder to debug
βœ— Vendor lock-in (Beam)
Use for: New pipelines, streaming, ETL jobs, serverless-first teams
Dataproc
Spark/Hadoop (Managed)
βœ“ Full Spark/Hadoop ecosystem
βœ“ Easy migration from on-prem
βœ“ Custom scripts & libraries
βœ“ Preemptible VMs (91% off)
βœ“ Jupyter/Zeppelin built-in
βœ— Cluster management needed
βœ— Manual scaling
βœ— Idle cluster costs money
Use for: Existing Spark code, ML workloads, lift-and-shift from on-prem Hadoop

Apache Beam Programming Model

Beam provides a unified programming model for batch and streaming. Key concepts:

Core Concepts

🎡 Cloud Composer (Airflow) Architecture
Cloud Composer: Managed Apache Airflow on GCPGKE CLUSTER (Managed Kubernetes)Schedulerβ€’ Parse DAG filesβ€’ Create task instancesβ€’ Determine execution orderWeb Serverβ€’ DAG visualizationβ€’ Task history & logsβ€’ Manual triggersWorker (Celery)β€’ Execute tasksβ€’ Auto-scaling podsβ€’ Horizontal scalingMetadata DBβ€’ PostgreSQLβ€’ Task stateβ€’ Connection infoENVIRONMENT CONFIGMachine type:n1-standard-1 to n1-standard-8Node count:3-6 GKE nodes (auto-scaling)Airflow version:2.x (managed upgrades)Environment variables:Project-wide settingsDAGs & OPERATORSDAGs:Python files in dags/ bucketOperators:GCSObjectExistsOperator, etc.Hooks:GcpHook, BigQueryHook, GCSHookSensors:Wait for condition (file, time)GCP SERVICE INTEGRATIONSBigQueryCloud StorageDataflowDataprocPub/SubCloud FunctionsVertex AIVERSION MANAGEMENT & UPGRADESAuto-upgradeMinor versionsManual upgradeMajor versionsCanary deployTest new versionsRollbackRevert if needed
Interview Tip: Cloud Composer runs Airflow on GKE β€” you manage DAGs, Google manages the cluster. Use GcpHook/BigQueryHook for GCP integrations. Store DAGs in a GCS bucket. Use environment variables for project-wide config. Prefer Cloud Composer over self-managed Airflow for production workloads.

Batch Pipeline Example

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

def parse_json(element):
    """Parse JSON element."""
    import json
    return json.loads(element)

def format_for_bigquery(element):
    """Format element for BigQuery insertion."""
    return {
        'event_id': element['event_id'],
        'event_type': element['event_type'],
        'user_id': element['user_id'],
        'timestamp': element['timestamp'],
        'amount': float(element.get('amount', 0))
    }

def run_batch_pipeline():
    """Run batch ETL pipeline from GCS to BigQuery."""
    pipeline_options = PipelineOptions([
        '--project', 'my-project',
        '--runner', 'DataflowRunner',
        '--region', 'us-central1',
        '--temp_location', 'gs://my-bucket/temp/',
        '--staging_location', 'gs://my-bucket/staging/',
        '--machine_type', 'n1-standard-4',
        '--max_num_workers', '10',
        '--autoscaling_algorithm', 'THROUGHPUT_BASED'
    ])

    with beam.Pipeline(options=pipeline_options) as pipeline:
        # Read from GCS
        raw_data = (
            pipeline
            | 'Read from GCS' >> beam.io.ReadFromText(
                'gs://my-data-lake/raw/events/*.json'
            )
        )

        # Parse and transform
        parsed_data = (
            raw_data
            | 'Parse JSON' >> beam.Map(parse_json)
            | 'Format for BQ' >> beam.Map(format_for_bigquery)
        )

        # Write to BigQuery
        (
            parsed_data
            | 'Write to BigQuery' >> beam.io.WriteToBigQuery(
                'my-project:analytics.events',
                schema='event_id:STRING,event_type:STRING,user_id:STRING,timestamp:TIMESTAMP,amount:FLOAT64',
                write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
                create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
            )
        )

Streaming Pipeline Example

import apache_beam as beam
from apache_beam.transforms.window import FixedWindows
from apache_beam.transforms.trigger import (
    AfterWatermark, AfterProcessingTime, AccumulationMode
)
from apache_beam.io.gcp.bigquery import WriteToBigQuery
import json

def run_streaming_pipeline():
    """Run streaming pipeline from Pub/Sub to BigQuery."""
    pipeline_options = PipelineOptions([
        '--project', 'my-project',
        '--runner', 'DataflowRunner',
        '--region', 'us-central1',
        '--temp_location', 'gs://my-bucket/temp/',
        '--streaming',
        '--enable_streaming_engine',
        '--num_workers', '4',
        '--max_num_workers', '20',
        '--autoscaling_algorithm', 'THROUGHPUT_BASED'
    ])

    with beam.Pipeline(options=pipeline_options) as pipeline:
        # Read from Pub/Sub
        messages = (
            pipeline
            | 'Read Pub/Sub' >> beam.io.ReadFromPubSub(
                topic='projects/my-project/topics/events'
            )
            | 'Decode' >> beam.Map(lambda x: json.loads(x.decode('utf-8')))
        )

        # Apply windowing and aggregation
        windowed_counts = (
            messages
            | 'Window into 1 min' >> beam.WindowInto(
                FixedWindows(60),
                trigger=AfterWatermark(
                    early=AfterProcessingTime(30)
                ),
                accumulation_mode=AccumulationMode.DISCARDING
            )
            | 'Extract event type' >> beam.Map(
                lambda x: (x['event_type'], 1)
            )
            | 'Count per window' >> beam.CombinePerKey(sum)
        )

        # Write to BigQuery
        (
            windowed_counts
            | 'Format' >> beam.Map(lambda x: {
                'event_type': x[0],
                'count': x[1],
                'window_start': str(beam.window.TimestampedValue(
                    x[1], x[0]
                ))
            })
            | 'Write to BQ' >> WriteToBigQuery(
                'my-project:analytics.windowed_counts',
                schema='event_type:STRING,count:INT64,window_start:TIMESTAMP',
                write_disposition=WriteToBigQuery.WRITE_APPEND,
                create_disposition=WriteToBigQuery.CREATE_IF_NEEDED
            )
        )

✨

Best Practice: For streaming pipelines, enable Streaming Engine to separate state management from worker processing. This reduces costs by 30-50% and improves autoscaling. Use --enable_streaming_engine flag in your pipeline options.

FlexRS (Flexible Resource Scheduling)

FlexRS provides cost optimization for batch workloads by allowing longer execution times in exchange for lower costs.

GCP Pricing Models for Data Engineering
πŸ’³
On-Demand
0%
Pay per use, no commitment
Dev/Test
πŸ“‹
Committed (1yr)
Up to 37%
1-year commitment
Steady production
πŸ“
Committed (3yr)
Up to 55%
3-year commitment
Long-term infra
⚑
Preemptible/Spot
Up to 91%
Short-lived VMs
Batch processing
πŸ’°
Sustained Use
Up to 30%
Auto discounts for long use
Always-on
πŸ”₯
Serverless
N/A
Pay per query/invocation
Event-driven
# FlexRS pipeline configuration
pipeline_options = PipelineOptions([
    '--project', 'my-project',
    '--runner', 'DataflowRunner',
    '--region', 'us-central1',
    '--temp_location', 'gs://my-bucket/temp/',
    '--staging_location', 'gs://my-bucket/staging/',
    '--flex_resource_scheduling_goal', 'COST_OPTIMIZED',
    '--number_of_worker_harness_threads', '4',
    '--experiments', 'use_runner_v2'
])

Dataflow Templates

Templates are pre-built pipelines that can be executed without writing code.

Pub/Sub to BigQuery Template

# Use Google's pre-built template
gcloud dataflow jobs run pubsub-to-bigquery \
  --gcs-location=gs://dataflow-templates/latest/PubSub_to_BigQuery \
  --parameters= \
    inputTopic=projects/my-project/topics/events,\
    outputTableSpec=my-project:analytics.events,\
    outputDeadletterTable=my-project:analytics.deadletter

Custom Template

# Create custom Dataflow template
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions

class CustomOptions(PipelineOptions):
    """Custom pipeline options for template."""
    def __init__(self, argv=None):
        super().__init__(argv=argv)
        self.add_argument(
            '--input_path',
            help='Input GCS path',
            default='gs://my-bucket/input/'
        )
        self.add_argument(
            '--output_dataset',
            help='BigQuery dataset',
            default='analytics'
        )

def run():
    """Run pipeline with custom options."""
    pipeline_options = PipelineOptions()
    custom_options = pipeline_options.view_as(CustomOptions)
    standard_options = pipeline_options.view_as(StandardOptions)

    with beam.Pipeline(options=pipeline_options) as pipeline:
        (
            pipeline
            | 'Read' >> beam.io.ReadFromText(custom_options.input_path)
            | 'Process' >> beam.FlatMap(process_element)
            | 'Write' >> beam.io.WriteToBigQuery(
                f"my-project:{custom_options.output_dataset}.output",
                schema='auto',
                write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
            )
        )

# Publish template
gcloud dataflow flex-templates run my-template \
  --image=gcr.io/my-project/custom-dataflow:latest \
  --sdk-container-image=gcr.io/my-project/custom-dataflow-sdk:latest \
  --parameters=input_path=gs://my-bucket/input/,output_dataset=analytics

Windowing and Triggers for Streaming

from apache_beam.transforms.window import (
    FixedWindows, SlidingWindows, Sessions, TimestampedValue
)
from apache_beam.transforms.trigger import (
    AfterWatermark, AfterProcessingTime, AfterCount,
    AccumulationMode, Repeatedly, AfterAny
)

# Fixed Windows (tumbling)
FixedWindows(60)  # 1-minute fixed windows

# Sliding Windows (overlapping)
SlidingWindows(60, 10)  # 1-minute windows sliding every 10 seconds

# Session Windows (activity-based)
Sessions(gap_size=300)  # 5-minute gap between events

# Triggers
# Fire when watermark passes window end
AfterWatermark(
    early=AfterProcessingTime(30),  # Early results every 30 sec
    late=AfterCount(1)  # Late data trigger
)

# Composite trigger
Repeatedly(
    AfterAny(
        AfterWatermark(),
        AfterProcessingTime(60)
    )
)

Autoscaling and Performance

Dataflow vs Dataproc: When to Use What
Dataflow
Apache Beam (Serverless)
βœ“ Fully managed, no cluster setup
βœ“ Auto-scaling (up and down)
βœ“ Unified stream + batch
βœ“ Exactly-once processing
βœ“ Pay per CPU/GB-second
βœ— Limited customization
βœ— Harder to debug
βœ— Vendor lock-in (Beam)
Use for: New pipelines, streaming, ETL jobs, serverless-first teams
Dataproc
Spark/Hadoop (Managed)
βœ“ Full Spark/Hadoop ecosystem
βœ“ Easy migration from on-prem
βœ“ Custom scripts & libraries
βœ“ Preemptible VMs (91% off)
βœ“ Jupyter/Zeppelin built-in
βœ— Cluster management needed
βœ— Manual scaling
βœ— Idle cluster costs money
Use for: Existing Spark code, ML workloads, lift-and-shift from on-prem Hadoop

ℹ️

Cost Tip: Dataflow charges per vCPU-hour, GB-hour, and streaming Shuffle/GB. For cost optimization: 1) Use preemptible VMs for batch (up to 75% savings), 2) Right-size machine types, 3) Enable Streaming Engine for streaming workloads, 4) Use FlexRS for non-urgent batch jobs.

πŸ’¬

Common Interview Questions

Q1: What is the difference between Dataflow and Dataproc?

Answer: Dataflow is serverless and managed β€” no cluster management, automatic autoscaling. Dataproc provides managed Spark/Hadoop clusters with more control. Use Dataflow for new development (unified batch/streaming), Dataproc for existing Spark/Hadoop workloads or when you need cluster-level control.

Q2: Explain the Apache Beam programming model.

Answer: Beam uses PCollections (immutable data collections) and Transforms (operations on PCollections). Core transforms include ParDo (map), GroupByKey (reduce), Combine (aggregate), and Window (time-based). Pipelines are DAGs of transforms. The same pipeline runs on both batch and streaming runners (Dataflow, Spark, Flink).

Q3: When would you use FlexRS vs. standard Dataflow?

Answer: FlexRS is for non-urgent batch workloads that can tolerate longer execution times (up to 6 hours). It provides up to 50% cost savings. Use FlexRS for daily aggregations, data warehouse refreshes, and backfill operations. Standard Dataflow is for latency-sensitive production ETL and streaming workloads.

Q4: How does Dataflow handle exactly-once processing?

Answer: Dataflow provides exactly-once processing semantics through: 1) Durable execution (state persisted to persistent storage), 2) Idempotent transforms (re-execution produces same results), 3) Deterministic transforms (same input always produces same output), 4) Checkpointing (periodic state saves).

Q5: What is Streaming Engine and when should you use it?

Answer: Streaming Engine offloads state management and shuffle operations from worker VMs to Google's managed service. It reduces costs by 30-50%, improves autoscaling, and eliminates worker VM limitations. Use Streaming Engine for all streaming pipelines β€” it's enabled with --enable_streaming_engine.

Advertisement