πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Cloud Pub/Sub for Messaging and Event Streaming

🟒 Free Lesson

Advertisement

Cloud Pub/Sub for Messaging and Event Streaming

Pub/Sub ArchitecturePublisherMessage ProducerTopicMessage ChannelSubscriptionMessage DeliverySubscriberMessage ConsumerDead LetterFailed MessagesPull SubscriptionSubscriber InitiatedPush SubscriptionServer InitiatedMessage OrderingSequence KeysExactly-OnceDelivery Semantics

Pub/Sub Architecture

Cloud Pub/Sub is a fully managed, real-time messaging service that allows you to send and receive messages between applications.

Core Components

Topics:

  • Named channels for message distribution
  • Support for message filtering
  • At-least-once delivery guarantee
  • Global endpoint access

Subscriptions:

  • Represent message delivery to subscribers
  • Support for pull and push delivery
  • Message acknowledgment mechanism
  • Configurable retention periods

Messages:

  • Immutable data payloads
  • Custom attributes for metadata
  • Message ordering keys
  • Exactly-once delivery options

Topic and Subscription Management

Creating Topics

# Create a topic
gcloud pubsub topics create my-topic

# Create a topic with labels
gcloud pubsub topics create my-topic \
    --labels=env=production,team=data-engineering

# Create a topic with message retention
gcloud pubsub topics create my-topic \
    --message-retention-duration=86400s  # 1 day

Creating Subscriptions

# Create a pull subscription
gcloud pubsub subscriptions create my-subscription \
    --topic=my-topic \
    --ack-deadline=60

# Create a push subscription
gcloud pubsub subscriptions create my-push-subscription \
    --topic=my-topic \
    --push-endpoint=https://my-endpoint.example.com \
    --ack-deadline=30

# Create a subscription with message retention
gcloud pubsub subscriptions create my-subscription \
    --topic=my-topic \
    --message-retention-duration=604800s  # 7 days

Python Client Library

from google.cloud import pubsub_v1
import json

# Initialize publisher
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path('my-project', 'my-topic')

# Publish a message
data = json.dumps({
    'event': 'user_signup',
    'user_id': 'user_123',
    'timestamp': '2024-01-15T10:30:00Z'
})

future = publisher.publish(
    topic_path,
    data.encode('utf-8'),
    event_type='user_event',
    source='web_app'
)
print(f'Published message ID: {future.result()}')

# Initialize subscriber
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path('my-project', 'my-subscription')

# Pull messages
response = subscriber.pull(
    request={'subscription': subscription_path, 'max_messages': 10}
)

for message in response.received_messages:
    print(f'Received message: {message.message.data}')
    print(f'Attributes: {message.message.attributes}')
    # Acknowledge the message
    subscriber.acknowledge(
        request={'subscription': subscription_path, 'ack_ids': [message.ack_id]}
    )

Pull vs Push Subscriptions

Pull Subscription

# Pull subscription example
from google.cloud import pubsub_v1
import concurrent.futures

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path('my-project', 'my-subscription')

def callback(message):
    """Process received message."""
    print(f'Received message: {message.data}')
    print(f'Message ID: {message.message_id}')
    print(f'Attributes: {message.attributes}')
    
    # Process the message
    try:
        process_message(message)
        message.ack()  # Acknowledge successful processing
    except Exception as e:
        message.nack()  # Negative acknowledge for retry

# Subscribe to messages
future = subscriber.subscribe(subscription_path, callback=callback)

try:
    future.result(timeout=300)  # Listen for 5 minutes
except KeyboardInterrupt:
    future.cancel()
    print('Subscriber stopped.')

Push Subscription

# Push subscription endpoint
from flask import Flask, request
import json

app = Flask(__name__)

@app.route('/pubsub/push', methods=['POST'])
def pubsub_push():
    """Handle Pub/Sub push messages."""
    envelope = request.get_json(silent=True)
    
    if 'message' not in envelope:
        return 'Bad request', 400
    
    message = envelope['message']
    data = json.loads(message['data'])
    attributes = message.get('attributes', {})
    
    # Process the message
    process_message(data, attributes)
    
    return 'OK', 200

def process_message(data, attributes):
    """Process the received message."""
    print(f'Processing: {data}')
    print(f'Attributes: {attributes}')

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=8080)

Comparison

FeaturePull SubscriptionPush Subscription
ControlSubscriber controls when to fetchServer sends messages automatically
ScalabilityBetter for high-throughputLimited by endpoint capacity
ReliabilityMore control over retriesDepends on endpoint availability
ComplexityMore complex to implementSimpler implementation
CostPay for pull operationsPay for push operations

Message Ordering

Message ordering ensures messages with the same ordering key are delivered in the order they were published.

Enabling Message Ordering

# Publish with ordering key
from google.cloud import pubsub_v1

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path('my-project', 'my-ordered-topic')

# Messages with same ordering key are delivered in order
for i in range(10):
    data = f'Message {i}'
    future = publisher.publish(
        topic_path,
        data.encode('utf-8'),
        ordering_key='user_123',  # Same ordering key
        sequence_number=str(i)
    )
    print(f'Published message {i}: {future.result()}')

# Create subscription with ordering
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path('my-project', 'my-ordered-subscription')

# Enable message ordering
subscription = subscriber.create_subscription(
    request={
        'name': subscription_path,
        'topic': topic_path,
        'enable_message_ordering': True,
        'ack_deadline_seconds': 60
    }
)

Ordering Key Best Practices

# Good: Use meaningful ordering keys
ordering_key = f'user_{user_id}'  # Per-user ordering
ordering_key = f'session_{session_id}'  # Per-session ordering
ordering_key = f'account_{account_id}'  # Per-account ordering

# Bad: Too many ordering keys (reduces throughput)
# Each ordering key creates a separate ordering queue
# Use fewer ordering keys for better throughput

# Monitor ordering latency
from google.cloud import monitoring_v3
import time

client = monitoring_v3.MetricServiceClient()
project_name = f"projects/my-project"

# Query ordering latency
interval = monitoring_v3.TimeInterval()
interval.end_time = time.time()
interval.start_time = time.time() - 3600  # Last hour

results = client.list_time_series(
    request={
        'name': project_name,
        'filter': 'metric.type="pubsub.googleapis.com/subscription/num_undelivered_messages"',
        'interval': interval,
        'view': monitoring_v3.ListTimeSeriesRequest.TimeSeriesView.FULL
    }
)

Dead-Letter Topics

Dead-letter topics capture messages that cannot be processed after a specified number of delivery attempts.

Configuring Dead-Letter Topics

# Create dead-letter topic
gcloud pubsub topics create dead-letter-topic

# Create subscription with dead-letter policy
gcloud pubsub subscriptions create my-subscription \
    --topic=my-topic \
    --dead-letter-topic=dead-letter-topic \
    --max-delivery-attempts=5

# Update existing subscription with dead-letter policy
gcloud pubsub subscriptions update my-subscription \
    --dead-letter-topic=dead-letter-topic \
    --max-delivery-attempts=5

Python Implementation

from google.cloud import pubsub_v1
import json

# Dead-letter subscription handler
def process_dead_letter(message):
    """Process messages from dead-letter topic."""
    print(f'Dead letter message: {message.data}')
    print(f'Original topic: {message.attributes.get("googclient_original_topic")}')
    print(f'Delivery attempt: {message.delivery_attempt}')
    
    # Log for investigation
    log_dead_letter(message)
    
    # Alert if needed
    if message.delivery_attempt >= 3:
        send_alert(f'Message failed {message.delivery_attempt} times')
    
    message.ack()

# Subscribe to dead-letter topic
subscriber = pubsub_v1.SubscriberClient()
dlq_subscription_path = subscriber.subscription_path(
    'my-project', 'dead-letter-subscription'
)

future = subscriber.subscribe(dlq_subscription_path, process_dead_letter)

Dead-Letter Policy Configuration

from google.cloud import pubsub_v1

subscriber = pubsub_v1.SubscriberClient()
topic_path = subscriber.topic_path('my-project', 'my-topic')
subscription_path = subscriber.subscription_path('my-project', 'my-subscription')

# Create subscription with dead-letter policy
subscription = subscriber.create_subscription(
    request={
        'name': subscription_path,
        'topic': topic_path,
        'dead_letter_policy': {
            'dead_letter_topic': 'projects/my-project/topics/dead-letter-topic',
            'max_delivery_attempts': 5
        },
        'ack_deadline_seconds': 60,
        'message_retention_duration': {'seconds': 86400}  # 1 day
    }
)

print(f'Subscription created: {subscription.name}')

Exactly-Once Delivery

Pub/Sub provides exactly-once delivery semantics through message deduplication.

Enabling Exactly-Once Delivery

from google.cloud import pubsub_v1

# Create topic with exactly-once delivery
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path('my-project', 'my-exactly-once-topic')

topic = publisher.create_topic(
    request={
        'name': topic_path,
        'message_retention_duration': {'seconds': 86400}
    }
)

# Create subscription with exactly-once delivery
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
    'my-project', 'my-exactly-once-subscription'
)

subscription = subscriber.create_subscription(
    request={
        'name': subscription_path,
        'topic': topic_path,
        'enable_exactly_once_delivery': True,
        'ack_deadline_seconds': 60
    }
)

Message Deduplication

# Publish with deduplication ID
from google.cloud import pubsub_v1
import hashlib

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path('my-project', 'my-exactly-once-topic')

# Generate deduplication ID
def generate_dedup_id(data):
    """Generate unique deduplication ID."""
    return hashlib.sha256(data.encode()).hexdigest()

# Publish message with deduplication
data = json.dumps({'order_id': 'order_123', 'amount': 99.99})
dedup_id = generate_dedup_id(data)

future = publisher.publish(
    topic_path,
    data.encode('utf-8'),
    deduplication_id=dedup_id,
    event_type='order_created'
)

print(f'Published with dedup ID: {dedup_id}')

Message Filtering

Message filtering allows subscribers to receive only messages matching specific criteria.

Filter Syntax

# Create subscription with filter
gcloud pubsub subscriptions create filtered-subscription \
    --topic=my-topic \
    --filter='attributes.event_type = "purchase"'

# Complex filter
gcloud pubsub subscriptions create complex-filtered-subscription \
    --topic=my-topic \
    --filter='attributes.region = "us-east1" AND attributes.event_type = "signup"'

Python Implementation

from google.cloud import pubsub_v1

subscriber = pubsub_v1.SubscriberClient()
topic_path = subscriber.topic_path('my-project', 'my-topic')
subscription_path = subscriber.subscription_path(
    'my-project', 'filtered-subscription'
)

# Create subscription with filter
subscription = subscriber.create_subscription(
    request={
        'name': subscription_path,
        'topic': topic_path,
        'filter': 'attributes.event_type = "purchase"',
        'ack_deadline_seconds': 60
    }
)

# Publish messages with attributes
publisher = pubsub_v1.PublisherClient()

# This message will be delivered
publisher.publish(
    topic_path,
    b'Purchase event',
    event_type='purchase',
    region='us-east1'
)

# This message will NOT be delivered (filtered out)
publisher.publish(
    topic_path,
    b'Login event',
    event_type='login',
    region='us-east1'
)

Performance Optimization

Throughput Optimization

# Optimize publisher throughput
from google.cloud import pubsub_v1
from concurrent.futures import ThreadPoolExecutor

publisher = pubsub_v1.PublisherClient(
    publisher_options=pubsub_v1.types.PublisherOptions(
        enable_message_ordering=False,
        flow_control=pubsub_v1.types.PublishFlowControl(
            message_limit=1000,
            byte_limit=10 * 1024 * 1024,  # 10MB
            limit_exceeded_behavior=pubsub_v1.types.LimitExceededBehavior.BLOCK
        )
    )
)

# Batch publishing
def publish_messages_batch(messages, topic_path):
    """Publish messages in batches."""
    futures = []
    for message in messages:
        future = publisher.publish(
            topic_path,
            message['data'].encode('utf-8'),
            **message['attributes']
        )
        futures.append(future)
    
    # Wait for all messages to be published
    for future in futures:
        print(f'Published: {future.result()}')

# Use thread pool for parallel publishing
with ThreadPoolExecutor(max_workers=10) as executor:
    executor.submit(publish_messages_batch, messages, topic_path)

Subscriber Optimization

# Optimize subscriber throughput
from google.cloud import pubsub_v1
import concurrent.futures

subscriber = pubsub_v1.SubscriberClient(
    subscriber_options=pubsub_v1.types.SubscriberOptions(
        enable_exactly_once_delivery=True,
        flow_control=pubsub_v1.types.FlowControl(
            limit_messages=1000,
            limit_bytes=100 * 1024 * 1024,  # 100MB
            limit_exceeded_behavior=pubsub_v1.types.LimitExceededBehavior.BLOCK
        )
    )
)

# Process messages in parallel
def process_message_batch(messages):
    """Process a batch of messages."""
    with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
        futures = [executor.submit(process_single_message, msg) for msg in messages]
        for future in concurrent.futures.as_completed(futures):
            try:
                future.result()
            except Exception as e:
                print(f'Error processing message: {e}')

Monitoring and Alerting

Key Metrics

# Monitor Pub/Sub metrics
from google.cloud import monitoring_v3
import time

client = monitoring_v3.MetricServiceClient()
project_name = f"projects/my-project"

# Key metrics to monitor:
# - pubsub.googleapis.com/subscription/num_undelivered_messages
# - pubsub.googleapis.com/subscription/oldest_unacked_message_age
# - pubsub.googleapis.com/subscription/ack_deadline_seconds
# - pubsub.googleapis.com/topic/message_count

def get_subscription_metrics(subscription_name, hours=1):
    """Get subscription metrics."""
    interval = monitoring_v3.TimeInterval()
    end_time = time.time()
    interval.end_time = end_time
    interval.start_time = end_time - (hours * 3600)

    results = client.list_time_series(
        request={
            'name': project_name,
            'filter': f'metric.type="pubsub.googleapis.com/subscription/num_undelivered_messages" AND resource.label.subscription="{subscription_name}"',
            'interval': interval,
            'view': monitoring_v3.ListTimeSeriesRequest.TimeSeriesView.FULL
        }
    )

    for result in results:
        print(f'Metric: {result.metric.type}')
        for point in result.points:
            print(f'  Value: {point.value.int64_value}')
            print(f'  Time: {point.interval.end_time}')

Alerting Policies

# Create alerting policy
from google.cloud import monitoring_v3

client = monitoring_v3.AlertPolicyServiceClient()
project_name = f"projects/my-project"

alert_policy = monitoring_v3.AlertPolicy(
    display_name="Pub/Sub High Lag Alert",
    conditions=[
        monitoring_v3.AlertPolicy.Condition(
            display_name="High message lag",
            condition_threshold=monitoring_v3.AlertPolicy.Condition.MetricThreshold(
                filter='metric.type="pubsub.googleapis.com/subscription/oldest_unacked_message_age"',
                comparison=monitoring_v3.ComparisonType.COMPARISON_GT,
                threshold_value=300,  # 5 minutes
                duration={"seconds": 300},
                aggregations=[
                    monitoring_v3.Aggregation(
                        alignment_period={"seconds": 60},
                        per_series_aligner=monitoring_v3.Aggregation.Aligner.ALIGN_MEAN
                    )
                ]
            )
        )
    ],
    notification_channels=[],
    alert_strategy=monitoring_v3.AlertPolicy.AlertStrategy(
        auto_close={"seconds": 1800}  # Auto-close after 30 minutes
    )
)

response = client.create_alert_policy(
    request={"name": project_name, "alert_policy": alert_policy}
)
print(f'Created alert policy: {response.name}')

Best Practices

  1. Use message ordering - When message sequence matters
  2. Implement dead-letter topics - Capture failed messages
  3. Enable exactly-once delivery - For critical transactions
  4. Use message filtering - Reduce unnecessary message processing
  5. Monitor lag metrics - Track subscription backlog
  6. Optimize batch sizes - Balance throughput and latency
  7. Implement proper error handling - Use ack/nack appropriately
⭐

Premium Content

Cloud Pub/Sub for Messaging and Event Streaming

Unlock this lesson and 900+ advanced tutorials with a Premium plan.

🎯End-to-end Projects
πŸ’ΌInterview Prep
πŸ“œCertificates
🀝Community Access

Already a member? Log in

Need Expert GCP Data Engineering Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement