Streaming Pipeline Architecture
Implementation
Pub/Sub Setup
from google.cloud import pubsub_v1
import json
# Create topic and subscriptions
publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
project_id = "my-project"
# Create topic
topic_path = publisher.topic_path(project_id, "real-time-events")
topic = publisher.create_topic(request={"name": topic_path})
# Create subscription with exactly-once delivery
sub_path = subscriber.subscription_path(project_id, "events-to-dataflow")
subscription = subscriber.create_subscription(
request={
"name": sub_path,
"topic": topic_path,
"ack_deadline_seconds": 60,
"enable_exactly_once_delivery": True,
"dead_letter_policy": {
"dead_letter_topic": f"projects/{project_id}/topics/events-deadletter",
"max_delivery_attempts": 5
}
}
)
# Publish events
def publish_event(event_data):
data = json.dumps(event_data).encode("utf-8")
future = publisher.publish(
topic_path,
data=data,
event_type=event_data.get("event_type"),
user_id=event_data.get("user_id")
)
return future.result()
Dataflow Streaming Pipeline
import apache_beam as beam
from apache_beam.transforms.window import FixedWindows, Sessions
from apache_beam.transforms.trigger import (
AfterWatermark, AfterProcessingTime, AccumulationMode
)
from apache_beam.io.gcp.bigquery import WriteToBigQuery, BigQueryDisposition
import json
def run_streaming_pipeline():
"""Real-time streaming pipeline: Pub/Sub → Dataflow → BigQuery."""
pipeline_options = PipelineOptions([
'--project', 'my-project',
'--runner', 'DataflowRunner',
'--region', 'us-central1',
'--temp_location', 'gs://my-data-lake/temp/',
'--streaming',
'--enable_streaming_engine',
'--num_workers', '4',
'--max_num_workers', '50',
'--autoscaling_algorithm', 'THROUGHPUT_BASED',
'--experiments', 'enable_streaming_engine'
])
with beam.Pipeline(options=pipeline_options) as pipeline:
# Read from Pub/Sub
messages = (
pipeline
| 'Read Pub/Sub' >> beam.io.ReadFromPubSub(
topic='projects/my-project/topics/real-time-events'
)
| 'Decode JSON' >> beam.Map(lambda x: json.loads(x.decode('utf-8')))
)
# Windowed aggregation (1-minute tumbling windows)
windowed_counts = (
messages
| 'Window into 1 min' >> beam.WindowInto(
FixedWindows(60),
trigger=AfterWatermark(
early=AfterProcessingTime(10)
),
accumulation_mode=AccumulationMode.DISCARDING
)
| 'Extract event type' >> beam.Map(lambda x: (x['event_type'], 1))
| 'Count per window' >> beam.CombinePerKey(sum)
)
# Write to BigQuery
(
windowed_counts
| 'Format for BQ' >> beam.Map(lambda x: {
'event_type': x[0],
'event_count': x[1],
'window_timestamp': beam.window.TimestampValue(
x[1], x[0]
)
})
| 'Write to BQ' >> WriteToBigQuery(
'my-project:analytics.real_time_counts',
schema='event_type:STRING,event_count:INT64,window_timestamp:TIMESTAMP',
write_disposition=BigQueryDisposition.WRITE_APPEND,
create_disposition=BigQueryDisposition.CREATE_IF_NEEDED
)
)
if __name__ == '__main__':
run_streaming_pipeline()
✨
Best Practice: Always enable Streaming Engine for streaming pipelines to offload state management from workers. Use early triggers with AfterProcessingTime for low-latency results. Implement dead-letter queues for failed messages.
Windowing Strategies
from apache_beam.transforms.window import (
FixedWindows, SlidingWindows, Sessions, TimestampedValue, Windowing
)
# Fixed Windows (tumbling)
# Non-overlapping, fixed-size windows
FixedWindows(60) # 1-minute tumbling windows
# Sliding Windows (overlapping)
# Overlapping windows for moving averages
SlidingWindows(60, 10) # 1-minute windows sliding every 10 seconds
# Session Windows (activity-based)
# Group events by activity gaps
Sessions(gap_size=300) # 5-minute gap between events
# Custom Windowing with Timestamps
def add_timestamp(element):
"""Add event timestamp for windowing."""
event_time = parse_timestamp(element['timestamp'])
return TimestampedValue(element, event_time)
Looker Dashboard Integration
-- BigQuery materialized view for Looker real-time dashboard
CREATE MATERIALIZED VIEW `project.analytics.real_time_dashboard_mv`
PARTITION BY DATE(window_timestamp)
CLUSTER BY event_type
AS
SELECT
event_type,
window_timestamp,
event_count,
SUM(event_count) OVER (
PARTITION BY event_type
ORDER BY window_timestamp
ROWS BETWEEN 5 PRECEDING AND CURRENT ROW
) as moving_avg_count
FROM `project.analytics.real_time_counts`
WHERE window_timestamp >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 7 DAY);
Common Interview Questions
Q1: What is the difference between at-least-once and exactly-once delivery?
Answer: At-least-once guarantees messages are delivered at least once but may be duplicated. Exactly-once guarantees each message is processed exactly once. For most use cases, at-least-once with idempotent processing is sufficient. Exactly-once is needed for financial or critical data.
Q2: How do you handle backpressure in streaming pipelines?
Answer: Use Pub/Sub flow control to limit message rates. Implement throttling in Dataflow workers. Use autoscaling to handle load spikes. Monitor queue depth and processing lag. Implement circuit breaker patterns for downstream systems.
Q3: What is the purpose of windowing in streaming?
Answer: Windowing divides unbounded streams into finite groups for processing. Fixed windows provide regular aggregations. Sliding windows enable moving averages. Session windows group activity by user behavior. Windowing is essential for time-based analytics on streaming data.
Q4: How do you minimize latency in streaming pipelines?
Answer: 1) Use Streaming Engine for state management, 2) Enable early triggers with AfterProcessingTime, 3) Use Pub/Sub Pull for batch processing, 4) Optimize BigQuery streaming inserts, 5) Use BI Engine for dashboard queries, 6) Minimize network hops.
Q5: How do you monitor streaming pipeline health?
Answer: Monitor: 1) Pub/Sub unacked message count, 2) Dataflow worker CPU/memory, 3) Processing lag (watermark delay), 4) BigQuery streaming insert errors, 5) End-to-end latency, 6) Cost per hour. Set alerts for anomalies.