🎉 75% of content is free forever — Unlock Premium from $10/mo →
CW
Search courses…
💼 Servicesℹ️ About✉️ ContactView Pricing Plansfrom $10

Streaming Pipeline Architecture: Pub/Sub → Dataflow → BigQuery → Looker

GCP Data EngineeringStreaming Pipelines⭐ Premium

Advertisement

Streaming Pipeline Architecture

Build production-ready real-time streaming pipelines using Pub/Sub, Dataflow, BigQuery, and Looker on GCP.

22 min readAdvanced

Streaming Pipeline Architecture

🏗️ GCP Data Engineering Reference Architecture
DATA SOURCES🗃️On-Prem DB☁️SaaS APIs📡IoT Sensors📱Mobile Apps🔌REST APIsINGESTION LAYERDataflow (CDC)Pub/SubCloud TasksStorage TransferTransfer ApplianceRAW DATA ZONE (Cloud Storage)landing/Ingested databronze/Unvalidatedarchive/Historicalraw/Original formatstaging/Temp processingPROCESSING LAYERDataflowStream + BatchDataprocSpark/HadoopCloud FunctionsEvent-drivenData PrepVisual ETLCloud ComposerOrchestrateCURATED DATA ZONEsilver/Cleaned, validatedgold/Business-readyaggregates/Pre-computedfeatures/ML featuresBigQuery (Warehouse)Looker (BI)Vertex AI (ML)Data StudioDataplex
Interview Tip: GCP's data engineering stack is serverless-first. Dataflow (Apache Beam) handles both streaming and batch. BigQuery is the flagship analytics service.

Implementation

Pub/Sub Setup

from google.cloud import pubsub_v1
import json

# Create topic and subscriptions
publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
project_id = "my-project"

# Create topic
topic_path = publisher.topic_path(project_id, "real-time-events")
topic = publisher.create_topic(request={"name": topic_path})

# Create subscription with exactly-once delivery
sub_path = subscriber.subscription_path(project_id, "events-to-dataflow")
subscription = subscriber.create_subscription(
    request={
        "name": sub_path,
        "topic": topic_path,
        "ack_deadline_seconds": 60,
        "enable_exactly_once_delivery": True,
        "dead_letter_policy": {
            "dead_letter_topic": f"projects/{project_id}/topics/events-deadletter",
            "max_delivery_attempts": 5
        }
    }
)

# Publish events
def publish_event(event_data):
    data = json.dumps(event_data).encode("utf-8")
    future = publisher.publish(
        topic_path,
        data=data,
        event_type=event_data.get("event_type"),
        user_id=event_data.get("user_id")
    )
    return future.result()

Dataflow Streaming Pipeline

import apache_beam as beam
from apache_beam.transforms.window import FixedWindows, Sessions
from apache_beam.transforms.trigger import (
    AfterWatermark, AfterProcessingTime, AccumulationMode
)
from apache_beam.io.gcp.bigquery import WriteToBigQuery, BigQueryDisposition
import json

def run_streaming_pipeline():
    """Real-time streaming pipeline: Pub/Sub → Dataflow → BigQuery."""
    pipeline_options = PipelineOptions([
        '--project', 'my-project',
        '--runner', 'DataflowRunner',
        '--region', 'us-central1',
        '--temp_location', 'gs://my-data-lake/temp/',
        '--streaming',
        '--enable_streaming_engine',
        '--num_workers', '4',
        '--max_num_workers', '50',
        '--autoscaling_algorithm', 'THROUGHPUT_BASED',
        '--experiments', 'enable_streaming_engine'
    ])

    with beam.Pipeline(options=pipeline_options) as pipeline:
        # Read from Pub/Sub
        messages = (
            pipeline
            | 'Read Pub/Sub' >> beam.io.ReadFromPubSub(
                topic='projects/my-project/topics/real-time-events'
            )
            | 'Decode JSON' >> beam.Map(lambda x: json.loads(x.decode('utf-8')))
        )

        # Windowed aggregation (1-minute tumbling windows)
        windowed_counts = (
            messages
            | 'Window into 1 min' >> beam.WindowInto(
                FixedWindows(60),
                trigger=AfterWatermark(
                    early=AfterProcessingTime(10)
                ),
                accumulation_mode=AccumulationMode.DISCARDING
            )
            | 'Extract event type' >> beam.Map(lambda x: (x['event_type'], 1))
            | 'Count per window' >> beam.CombinePerKey(sum)
        )

        # Write to BigQuery
        (
            windowed_counts
            | 'Format for BQ' >> beam.Map(lambda x: {
                'event_type': x[0],
                'event_count': x[1],
                'window_timestamp': beam.window.TimestampValue(
                    x[1], x[0]
                )
            })
            | 'Write to BQ' >> WriteToBigQuery(
                'my-project:analytics.real_time_counts',
                schema='event_type:STRING,event_count:INT64,window_timestamp:TIMESTAMP',
                write_disposition=BigQueryDisposition.WRITE_APPEND,
                create_disposition=BigQueryDisposition.CREATE_IF_NEEDED
            )
        )

if __name__ == '__main__':
    run_streaming_pipeline()

Best Practice: Always enable Streaming Engine for streaming pipelines to offload state management from workers. Use early triggers with AfterProcessingTime for low-latency results. Implement dead-letter queues for failed messages.

Windowing Strategies

from apache_beam.transforms.window import (
    FixedWindows, SlidingWindows, Sessions, TimestampedValue, Windowing
)

# Fixed Windows (tumbling)
# Non-overlapping, fixed-size windows
FixedWindows(60)  # 1-minute tumbling windows

# Sliding Windows (overlapping)
# Overlapping windows for moving averages
SlidingWindows(60, 10)  # 1-minute windows sliding every 10 seconds

# Session Windows (activity-based)
# Group events by activity gaps
Sessions(gap_size=300)  # 5-minute gap between events

# Custom Windowing with Timestamps
def add_timestamp(element):
    """Add event timestamp for windowing."""
    event_time = parse_timestamp(element['timestamp'])
    return TimestampedValue(element, event_time)

Looker Dashboard Integration

-- BigQuery materialized view for Looker real-time dashboard
CREATE MATERIALIZED VIEW `project.analytics.real_time_dashboard_mv`
PARTITION BY DATE(window_timestamp)
CLUSTER BY event_type
AS
SELECT
    event_type,
    window_timestamp,
    event_count,
    SUM(event_count) OVER (
        PARTITION BY event_type
        ORDER BY window_timestamp
        ROWS BETWEEN 5 PRECEDING AND CURRENT ROW
    ) as moving_avg_count
FROM `project.analytics.real_time_counts`
WHERE window_timestamp >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 7 DAY);
💬

Common Interview Questions

Q1: What is the difference between at-least-once and exactly-once delivery?

Answer: At-least-once guarantees messages are delivered at least once but may be duplicated. Exactly-once guarantees each message is processed exactly once. For most use cases, at-least-once with idempotent processing is sufficient. Exactly-once is needed for financial or critical data.

Q2: How do you handle backpressure in streaming pipelines?

Answer: Use Pub/Sub flow control to limit message rates. Implement throttling in Dataflow workers. Use autoscaling to handle load spikes. Monitor queue depth and processing lag. Implement circuit breaker patterns for downstream systems.

Q3: What is the purpose of windowing in streaming?

Answer: Windowing divides unbounded streams into finite groups for processing. Fixed windows provide regular aggregations. Sliding windows enable moving averages. Session windows group activity by user behavior. Windowing is essential for time-based analytics on streaming data.

Q4: How do you minimize latency in streaming pipelines?

Answer: 1) Use Streaming Engine for state management, 2) Enable early triggers with AfterProcessingTime, 3) Use Pub/Sub Pull for batch processing, 4) Optimize BigQuery streaming inserts, 5) Use BI Engine for dashboard queries, 6) Minimize network hops.

Q5: How do you monitor streaming pipeline health?

Answer: Monitor: 1) Pub/Sub unacked message count, 2) Dataflow worker CPU/memory, 3) Processing lag (watermark delay), 4) BigQuery streaming insert errors, 5) End-to-end latency, 6) Cost per hour. Set alerts for anomalies.

Advertisement