Message Schemas
from google.cloud import pubsub_v1
publisher = pubsub_v1.PublisherClient()
# Create schema
schema = publisher.create_schema(
request={
"parent": "projects/my-project",
"schema": {
"name": "event-schema",
"type_": pubsub_v1.Schema.Type.PROTOCOL_BUFFER,
"definition": """
message Event {
string event_id = 1;
string event_type = 2;
string user_id = 3;
google.protobuf.Timestamp timestamp = 4;
double amount = 5;
}
"""
}
}
)
# Create topic with schema validation
topic = publisher.create_topic(
request={
"name": "projects/my-project/topics/events-validated",
"schema_settings": {
"schema": schema.name,
"encoding": pubsub_v1.SchemaEncoding.Encoding.BINARY
}
}
)
Message Filtering
# Create subscription with filter
subscriber = pubsub_v1.SubscriberClient()
subscription = subscriber.create_subscription(
request={
"name": "projects/my-project/subscriptions/purchases-only",
"topic": "projects/my-project/topics/events",
"filter": 'event_type = "purchase" AND amount > 100'
}
)
# Publish with attributes for filtering
publisher.publish(
topic_path,
data=json.dumps(event).encode("utf-8"),
event_type="purchase",
amount="150.00"
)
BigQuery Subscriptions
# Create BigQuery subscription
subscription = subscriber.create_subscription(
request={
"name": "projects/my-project/subscriptions/events-to-bq",
"topic": "projects/my-project/topics/events",
"bigquery_config": {
"table": "projects/my-project/datasets/analytics/tables/events",
"write_metadata": True,
"use_topic_schema": True
},
"enable_exactly_once_delivery": True
}
)
β¨
Best Practice: Use schema validation to ensure message quality. Use filtering to route messages to specific subscribers. Implement BigQuery subscriptions for direct data loading. Use dead-letter topics for failed messages.
Common Interview Questions
Q1: What are the benefits of message schemas?
Answer: Schemas enforce message structure, enable validation, support type-safe consumers, and improve documentation. They prevent malformed messages and enable automatic code generation.
Q2: How does Pub/Sub filtering work?
Answer: Filtering uses attribute-based expressions to route messages to specific subscribers. Messages that don't match the filter are automatically acknowledged. This reduces unnecessary message processing.
Q3: When would you use BigQuery subscriptions?
Answer: BigQuery subscriptions directly load messages into BigQuery tables without intermediate processing. Use them for simple data ingestion where transformations aren't needed. They support schema validation and exactly-once delivery.
Q4: What is the purpose of dead-letter topics?
Answer: Dead-letter topics capture messages that fail processing after maximum delivery attempts. They prevent poison messages from blocking the queue and enable separate analysis and reprocessing.
Q5: How do you handle message ordering?
Answer: Use ordering keys to ensure messages with the same key are delivered in publish order. Ordering limits throughput to 1 MB/s per key. Use only when strict ordering is required.