Pub/Sub Architecture
Google Cloud Pub/Sub is a fully managed, real-time messaging service that allows any-to-any communication between applications and data systems.
Architecture Overview
Topic and Subscription Management
Creating Topics and Subscriptions
from google.cloud import pubsub_v1
# Create publisher
publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
project_id = "my-project"
topic_id = "events"
# Create topic with ordering and schema
topic_path = publisher.topic_path(project_id, topic_id)
# Create schema for structured messages
schema = publisher.create_schema(
request={
"parent": f"projects/{project_id}",
"schema": {
"name": "event-schema",
"type_": pubsub_v1.Schema.Type.JSON,
"definition": json.dumps({
"type": "object",
"properties": {
"event_id": {"type": "string"},
"event_type": {"type": "string"},
"user_id": {"type": "string"},
"timestamp": {"type": "string"},
"amount": {"type": "number"}
},
"required": ["event_id", "event_type", "user_id"]
})
}
}
)
# Create topic with schema and ordering
topic = publisher.create_topic(
request={
"name": topic_path,
"schema_settings": {
"schema": schema.name,
"encoding": pubsub_v1.SchemaEncoding.Encoding.JSON
},
"message_storage_policy": {
"allowed_persistence_regions": ["us-central1"]
}
}
)
# Create subscription with exactly-once delivery
subscription_path = subscriber.subscription_path(project_id, f"{topic_id}-sub")
subscription = subscriber.create_subscription(
request={
"name": subscription_path,
"topic": topic_path,
"ack_deadline_seconds": 60,
"message_retention_duration": {"seconds": 604800}, # 7 days
"enable_exactly_once_delivery": True,
"enable_message_ordering": True,
"dead_letter_policy": {
"dead_letter_topic": f"projects/{project_id}/topics/{topic_id}-deadletter",
"max_delivery_attempts": 5
},
"retry_policy": {
"minimum_backoff": {"seconds": 10},
"maximum_backoff": {"seconds": 600}
}
}
)
Publishing Messages with Ordering
from google.cloud import pubsub_v1
import json
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path("my-project", "events")
def publish_with_ordering(event_data):
"""Publish message with ordering key."""
message_data = json.dumps(event_data).encode("utf-8")
# Use ordering key for related messages
ordering_key = event_data.get("user_id", "default")
future = publisher.publish(
topic_path,
data=message_data,
ordering_key=ordering_key,
event_type=event_data.get("event_type"),
user_id=event_data.get("user_id")
)
return future.result()
# Publish ordered events for a user
events = [
{"event_id": "1", "event_type": "login", "user_id": "user_123", "timestamp": "2025-01-15T10:00:00Z"},
{"event_id": "2", "event_type": "view", "user_id": "user_123", "timestamp": "2025-01-15T10:01:00Z"},
{"event_id": "3", "event_type": "purchase", "user_id": "user_123", "timestamp": "2025-01-15T10:02:00Z"},
]
for event in events:
publish_with_ordering(event)
print(f"Published event: {event['event_id']}")
β¨
Best Practice: Use ordering keys sparingly β only when message order matters. Ordering limits throughput to 1 MB/s per ordering key. Group related messages by entity ID (e.g., user_id) and process in parallel across different ordering keys.
Exactly-Once Delivery
Pub/Sub supports exactly-once delivery, which guarantees messages are processed exactly once within the acknowledgment deadline.
from google.cloud import pubsub_v1
import json
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path("my-project", "events-sub")
def callback(message):
"""Process message with exactly-once semantics."""
try:
data = json.loads(message.data.decode("utf-8"))
# Process message
process_event(data)
# Acknowledge with exactly-once delivery
message.ack()
print(f"Processed message: {data['event_id']}")
except Exception as e:
# Nack for retry (within retry policy limits)
message.nack()
print(f"Failed to process message: {e}")
# Subscribe with exactly-once delivery
streaming_pull_future = subscriber.subscribe(
subscription_path,
callback=callback,
enable_exactly_once=True
)
# Keep the main thread alive
with streaming_pull_future:
streaming_pull_future.result()
BigQuery Subscriptions
BigQuery subscriptions allow direct message delivery to BigQuery tables without intermediate processing.
from google.cloud import pubsub_v1
subscriber = pubsub_v1.SubscriberClient()
# Create BigQuery subscription
subscription_path = subscriber.subscription_path(
"my-project", "events-to-bigquery"
)
subscription = subscriber.create_subscription(
request={
"name": subscription_path,
"topic": "projects/my-project/topics/events",
"bigquery_config": {
"table": "projects/my-project/datasets/analytics/tables/events",
"write_metadata": True,
"use_topic_schema": True
},
"ack_deadline_seconds": 60,
"enable_exactly_once_delivery": True
}
)
Filtering and Message Selection
from google.cloud import pubsub_v1
publisher = pubsub_v1.PublisherClient()
# Publish with attributes for filtering
publisher.publish(
topic_path,
data=json.dumps(event).encode("utf-8"),
event_type="purchase", # Attribute for filtering
region="us-east1"
)
# Create subscription with filter
subscriber = pubsub_v1.SubscriberClient()
subscription = subscriber.create_subscription(
request={
"name": "projects/my-project/subscriptions/purchases-only",
"topic": topic_path,
"filter": 'event_type = "purchase" AND region = "us-east1"'
}
)
Dead-Letter Topics
Dead-letter topics capture messages that fail processing after maximum delivery attempts.
from google.cloud import pubsub_v1
subscriber = pubsub_v1.SubscriberClient()
# Create dead-letter topic
publisher_client = pubsub_v1.PublisherClient()
dead_letter_topic = publisher_client.create_topic(
request={"name": "projects/my-project/topics/events-deadletter"}
)
# Create subscription with dead-letter policy
subscription = subscriber.create_subscription(
request={
"name": "projects/my-project/subscriptions/events-sub",
"topic": "projects/my-project/topics/events",
"dead_letter_policy": {
"dead_letter_topic": dead_letter_topic.name,
"max_delivery_attempts": 5
}
}
)
# Monitor dead-letter topic
def monitor_dead_letters(subscription_path):
"""Monitor and process dead-letter messages."""
subscriber = pubsub_v1.SubscriberClient()
def callback(message):
print(f"Dead-letter message: {message.data}")
# Analyze and fix failed messages
message.ack()
subscriber.subscribe(subscription_path, callback=callback)
β οΈ
Warning: Dead-letter topics are in preview. Configure max_delivery_attempts carefully β too low may cause premature dead-lettering, too high delays failure detection. Start with 5 attempts and adjust based on your error patterns.
Performance and Scaling
Common Interview Questions
Q1: When would you use Pub/Sub vs. Cloud Tasks?
Answer: Pub/Sub is for event-driven, decoupled communication between multiple producers and consumers. Cloud Tasks is for directed, asynchronous task execution with rate limiting. Use Pub/Sub for event streaming, notifications, and fan-out patterns. Use Cloud Tasks for scheduled jobs, rate-limited API calls, and task queues.
Q2: Explain exactly-once delivery in Pub/Sub.
Answer: Exactly-once delivery guarantees that messages are processed exactly once within the acknowledgment deadline. It's achieved through idempotent publishers, deduplication at the subscriber level, and at-least-once delivery with idempotent processing. Enable it with enable_exactly_once=True on subscriptions.
Q3: How do you handle message ordering in Pub/Sub?
Answer: Use ordering keys to group related messages. Messages with the same ordering key are delivered in publish order. Ordering limits throughput to 1 MB/s per key. Use ordering only when strict ordering is required β for most use cases, eventual ordering with timestamps is sufficient.
Q4: What is the purpose of dead-letter topics?
Answer: Dead-letter topics capture messages that fail processing after maximum delivery attempts. They prevent poison messages from blocking the queue. Monitor dead-letter topics to identify and fix processing errors. Configure max_delivery_attempts based on your error patterns (typically 3-10 attempts).
Q5: How do you optimize Pub/Sub for high-throughput streaming?
Answer: 1) Increase acknowledgment deadline for slow processors, 2) Use pull subscriptions for batch processing, 3) Distribute across multiple ordering keys, 4) Use Cloud Storage or BigQuery subscriptions for direct sinks, 5) Enable message ordering only when required, 6) Use flow control to manage backpressure.