πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Google Cloud Dataflow for Batch and Stream Processing

🟒 Free Lesson

Advertisement

Google Cloud Dataflow for Batch and Stream Processing

Dataflow Pipeline ArchitectureSourceGCS, Pub/Sub, BQPTransformMap, Filter, GroupWindowTime/Session/GlobalTriggerEarly/Late/CustomSinkBigQuery, GCS, Pub/SubAutoscalingDynamic Worker PoolMonitoringStackdriver MetricsWatermarksEvent Time ProcessingState ManagementKeyed State

Apache Beam Programming Model

Apache Beam provides a unified programming model for batch and stream processing. Dataflow is Google's managed service for running Beam pipelines.

Core Concepts

PCollection: Immutable, distributed dataset PTransform: Operation that processes PCollections Pipeline: Series of PTransforms applied to PCollections Runner: Execution engine (Dataflow, DirectRunner, SparkRunner)

Pipeline Structure

πŸ—οΈ GCP Data Engineering Reference Architecture
DATA SOURCESπŸ—ƒοΈOn-Prem DB☁️SaaS APIsπŸ“‘IoT SensorsπŸ“±Mobile AppsπŸ”ŒREST APIsINGESTION LAYERDataflow (CDC)Pub/SubCloud TasksStorage TransferTransfer ApplianceRAW DATA ZONE (Cloud Storage)landing/Ingested databronze/Unvalidatedarchive/Historicalraw/Original formatstaging/Temp processingPROCESSING LAYERDataflowStream + BatchDataprocSpark/HadoopCloud FunctionsEvent-drivenData PrepVisual ETLCloud ComposerOrchestrateCURATED DATA ZONEsilver/Cleaned, validatedgold/Business-readyaggregates/Pre-computedfeatures/ML featuresBigQuery (Warehouse)Looker (BI)Vertex AI (ML)Data StudioDataplex
Interview Tip: GCP's data engineering stack is serverless-first. Dataflow (Apache Beam) handles both streaming and batch. BigQuery is the flagship analytics service.

Basic Pipeline Examples

Batch Processing Pipeline

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

def parse_csv(line):
    """Parse CSV line into dictionary."""
    fields = line.split(',')
    return {
        'id': fields[0],
        'name': fields[1],
        'amount': float(fields[2]),
        'timestamp': fields[3]
    }

def format_for_bigquery(element):
    """Format element for BigQuery insertion."""
    return {
        'id': element['id'],
        'name': element['name'],
        'amount': element['amount'],
        'processed_at': element['timestamp']
    }

def run_batch_pipeline():
    options = PipelineOptions([
        '--project', 'my-project',
        '--region', 'us-central1',
        '--runner', 'DataflowRunner',
        '--temp_location', 'gs://my-bucket/temp',
        '--staging_location', 'gs://my-bucket/staging',
        '--num_workers', '4',
        '--max_num_workers', '8',
        '--autoscaling_algorithm', 'THROUGHPUT_BASED'
    ])

    with beam.Pipeline(options=options) as pipeline:
        (
            pipeline
            | 'Read CSV' >> beam.io.ReadFromText('gs://my-bucket/input/*.csv')
            | 'Parse CSV' >> beam.Map(parse_csv)
            | 'Filter Valid' >> beam.Filter(lambda x: x['amount'] > 0)
            | 'Format for BQ' >> beam.Map(format_for_bigquery)
            | 'Write to BQ' >> beam.io.WriteToBigQuery(
                'my-project:analytics.processed_data',
                schema='id:STRING,name:STRING,amount:FLOAT64,processed_at:TIMESTAMP',
                write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
                create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER
            )
        )

if __name__ == '__main__':
    run_batch_pipeline()

Stream Processing Pipeline

import apache_beam as beam
from apache_beam.transforms.window import FixedWindows, Sessions
from apache_beam.transforms.trigger import (
    AfterWatermark, AfterProcessingTime, AccumulationMode
)
from apache_beam.options.pipeline_options import StandardOptions

def parse_pubsub_message(message):
    """Parse Pub/Sub message."""
    import json
    data = json.loads(message.data.decode('utf-8'))
    return {
        'user_id': data['user_id'],
        'action': data['action'],
        'amount': data.get('amount', 0),
        'timestamp': data['timestamp']
    }

def run_streaming_pipeline():
    options = PipelineOptions([
        '--project', 'my-project',
        '--region', 'us-central1',
        '--runner', 'DataflowRunner',
        '--temp_location', 'gs://my-bucket/temp',
        '--streaming'
    ])
    options.view_as(StandardOptions).streaming = True

    with beam.Pipeline(options=options) as pipeline:
        (
            pipeline
            | 'Read from Pub/Sub' >> beam.io.ReadFromPubSub(
                topic='projects/my-project/topics/user-events'
            )
            | 'Parse Messages' >> beam.Map(parse_pubsub_message)
            | 'Window into 1-minute' >> beam.WindowInto(
                FixedWindows(60),
                trigger=AfterWatermark(
                    early=AfterProcessingTime(10)
                ),
                accumulation_mode=AccumulationMode.DISCARDING
            )
            | 'Aggregate by User' >> beam.CombinePerKey(sum)
            | 'Format Results' >> beam.Map(lambda x: {
                'user_id': x[0],
                'total_amount': x[1],
                'window_start': str(x[2])
            })
            | 'Write to BigQuery' >> beam.io.WriteToBigQuery(
                'my-project:analytics.user_totals',
                schema='user_id:STRING,total_amount:FLOAT64,window_start:TIMESTAMP',
                write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
            )
        )

Windowing Strategies

Windowing divides streaming data into finite groups for processing.

Fixed Windows

from apache_beam.transforms.window import FixedWindows

# Fixed 5-minute windows
(
    pipeline
    | 'Read' >> beam.io.ReadFromPubSub(topic='topic')
    | 'Window' >> beam.WindowInto(FixedWindows(300))  # 5 minutes
    | 'Process' >> beam.CombinePerKey(sum)
)

Sliding Windows

from apache_beam.transforms.window import SlidingWindows

# Sliding windows: 10-minute window, advance every 1 minute
(
    pipeline
    | 'Read' >> beam.io.ReadFromPubSub(topic='topic')
    | 'Window' >> beam.WindowInto(SlidingWindows(600, 60))  # 10min window, 1min advance
    | 'Process' >> beam.CombinePerKey(sum)
)

Session Windows

from apache_beam.transforms.window import Sessions

# Session windows: gap duration of 5 minutes
(
    pipeline
    | 'Read' >> beam.io.ReadFromPubSub(topic='topic')
    | 'Window' >> beam.WindowInto(Sessions(300))  # 5-minute gap
    | 'Process' >> beam.CombinePerKey(sum)
)

Global Windows

from apache_beam.transforms.window import GlobalWindows

# Global window with custom trigger
(
    pipeline
    | 'Read' >> beam.io.ReadFromPubSub(topic='topic')
    | 'Window' >> beam.WindowInto(
        GlobalWindows(),
        trigger=AfterProcessingTime(60),  # Trigger every minute
        accumulation_mode=AccumulationMode.ACCUMULATING
    )
    | 'Process' >> beam.CombinePerKey(sum)
)

Triggers

Triggers determine when to emit results from windows.

Common Trigger Patterns

from apache_beam.transforms.trigger import (
    AfterWatermark, AfterProcessingTime, AfterCount,
    AccumulationMode, Repeatedly, AfterAny
)

# Early results: emit every 30 seconds
# On-time: emit when watermark passes window end
# Late results: emit up to 5 minutes late
trigger = AfterWatermark(
    early=AfterProcessingTime(30),
    late=AfterProcessingTime(300)
)

# Count-based trigger: emit every 1000 elements
trigger = AfterCount(1000)

# Composite trigger: emit on count OR time
trigger = AfterAny(AfterCount(1000), AfterProcessingTime(60))

# Repeated trigger: repeat the inner trigger
trigger = Repeatedly(AfterCount(100))

Accumulation Modes

from apache_beam.transforms.trigger import AccumulationMode

# Discard accumulating state (default)
# Each trigger fires with only new data since last trigger
accumulation_mode = AccumulationMode.DISCARDING

# Accumulate state
# Each trigger fires with all data since window opened
accumulation_mode = AccumulationMode.ACCUMULATING

Watermarks

Watermarks track progress through event time. They determine when windows are considered complete.

Watermark Concepts

Architecture Diagram
Event Time: When the event actually occurred
Processing Time: When the event is processed
Watermark: Estimate of how far behind real-time we are
Lateness: How late an event arrives after the watermark

Watermark Management

# Watermark tracking in pipeline
(
    pipeline
    | 'Read' >> beam.io.ReadFromPubSub(topic='topic')
    | 'Add Timestamps' >> beam.Map(
        lambda x: beam.window.TimestampedValue(x, x['timestamp'])
    )
    | 'Window' >> beam.WindowInto(
        FixedWindows(60),
        trigger=AfterWatermark(
            early=AfterProcessingTime(10),
            late=AfterProcessingTime(300)
        ),
        allowed_lateness=300,  # 5 minutes
        accumulation_mode=AccumulationMode.DISCARDING
    )
)

Handling Late Data

# Late data handling strategy
def handle_late_data(element):
    """Process late-arriving data."""
    user_id, data = element
    return {
        'user_id': user_id,
        'total': sum(data['amounts']),
        'is_late': data['is_late'],
        'window_start': data['window_start']
    }

# Pipeline with late data handling
(
    pipeline
    | 'Read' >> beam.io.ReadFromPubSub(topic='topic')
    | 'Window' >> beam.WindowInto(
        FixedWindows(60),
        trigger=AfterWatermark(
            early=AfterProcessingTime(10),
            late=AfterProcessingTime(300)
        ),
        allowed_lateness=300,
        accumulation_mode=AccumulationMode.ACCUMULATING
    )
    | 'Combine' >> beam.CombinePerKey(sum)
    | 'Handle Late' >> beam.Map(handle_late_data)
)

Autoscaling

Dataflow automatically scales the number of workers based on throughput.

Autoscaling Configuration

# Enable autoscaling
options = PipelineOptions([
    '--project', 'my-project',
    '--region', 'us-central1',
    '--runner', 'DataflowRunner',
    '--num_workers', '2',
    '--max_num_workers', '10',
    '--autoscaling_algorithm', 'THROUGHPUT_BASED',
    '--worker_machine_type', 'n1-standard-4',
    '--disk_size_gb', '100',
    '--disk_type', 'pd-ssd'
])

Autoscaling Parameters

ParameterDescriptionDefault
num_workersInitial number of workers3
max_num_workersMaximum number of workers1000
autoscaling_algorithmTHROUGHPUT_BASED or NONETHROUGHPUT_BASED
worker_machine_typeMachine type for workersn1-standard-1

Monitoring Autoscaling

# Monitor autoscaling via API
from google.cloud import dataflow_v1beta3

client = dataflow_v1beta3.DataflowV1Beta3Client()

# Get job status
job = client.get_job(
    project_id='my-project',
    job_id='job-id',
    location='us-central1'
)

# Check worker count
print(f"Current workers: {job.current_worker_count}")
print(f"Job status: {job.job_state}")

State Management

State management enables maintaining information across elements in a pipeline.

Keyed State

from apache_beam.transforms.userstate import (
    BagStateSpec, CombiningValueStateSpec, TimerSpec,
    on_timer, RuntimeTimer, RuntimeState
)

# State specifications
BUFFER_STATE = BagStateSpec('buffer', beam.VarIntCoder())
COUNT_STATE = CombiningValueStateSpec('count', beam.VarIntCoder(), sum)
EXPIRY_TIMER = TimerSpec('expiry', beam.RealtimeClock())

# Stateful DoFn
class StatefulCombineFn(beam.DoFn):
    def process(self, element, buffer=beam.DoFn.StateParam(BUFFER_STATE),
                count=beam.DoFn.StateParam(COUNT_STATE),
                timer=beam.DoFn.TimerParam(EXPIRY_TIMER)):
        key, value = element
        buffer.add(value)
        count.add(1)
        timer.set(beam.Timestamp.from_secs(300))  # 5-minute expiry

    @on_timer(EXPIRY_TIMER)
    def on_expiry(self, buffer=beam.DoFn.StateParam(BUFFER_STATE),
                  count=beam.DoFn.StateParam(COUNT_STATE)):
        yield (list(buffer.read()), count.read())
        buffer.clear()
        count.clear()

Stateful Processing Example

# Session windowing with state
(
    pipeline
    | 'Read' >> beam.io.ReadFromPubSub(topic='topic')
    | 'Key by User' >> beam.Map(lambda x: (x['user_id'], x))
    | 'Window' >> beam.WindowInto(Sessions(300))
    | 'Stateful Combine' >> beam.ParDo(StatefulCombineFn())
    | 'Format Output' >> beam.Map(format_session_output)
)

Monitoring and Debugging

Stackdriver Metrics

# Custom metrics in pipeline
from apache_beam.metrics import Metrics

class ProcessFn(beam.DoFn):
    def __init__(self):
        self.processed_counter = Metrics.counter(self.__class__, 'elements_processed')
        self.error_counter = Metrics.counter(self.__class__, 'errors')
        self.latency_histogram = Metrics.distribution(self.__class__, 'processing_latency_ms')

    def process(self, element):
        start_time = time.time()
        try:
            # Process element
            result = transform(element)
            self.processed_counter.inc()
            yield result
        except Exception as e:
            self.error_counter.inc()
            raise
        finally:
            latency = (time.time() - start_time) * 1000
            self.latency_histogram.update(latency)

Monitoring Dashboard

# Create monitoring dashboard
from google.cloud import monitoring_v3

client = monitoring_v3.MetricServiceClient()
project_name = f"projects/my-project"

# Create dashboard
dashboard = monitoring_v3.Dashboard({
    "display_name": "Dataflow Pipeline Dashboard",
    "mosaic_layout": {
        "tiles": [
            {
                "width": 6,
                "height": 4,
                "widget": {
                    "title": "Elements Processed",
                    "xy_chart": {
                        "data_sets": [{
                            "time_series_query": {
                                "time_series_filter": {
                                    "filter": 'metric.type="dataflow.googleapis.com/job/element_count"',
                                    "aggregation": {
                                        "alignment_period": {"seconds": 60},
                                        "per_series_aligner": "ALIGN_RATE"
                                    }
                                }
                            }
                        }]
                    }
                }
            }
        ]
    }
})

Best Practices

  1. Use windowing appropriately - Match window size to business requirements
  2. Implement triggers - Handle early, on-time, and late data
  3. Monitor watermarks - Track watermark lag and late data
  4. Enable autoscaling - Let Dataflow manage worker count
  5. Use stateful processing - Maintain state across elements when needed
  6. Implement error handling - Use dead-letter patterns for failed elements
  7. Monitor costs - Track slot usage and optimize query patterns
⭐

Premium Content

Google Cloud Dataflow for Batch and Stream Processing

Unlock this lesson and 900+ advanced tutorials with a Premium plan.

🎯End-to-end Projects
πŸ’ΌInterview Prep
πŸ“œCertificates
🀝Community Access

Already a member? Log in

Need Expert GCP Data Engineering Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement