Module: Google::Cloud::Pubsub
- Defined in:
- lib/google/cloud/pubsub.rb,
lib/google/cloud/pubsub/topic.rb,
lib/google/cloud/pubsub/policy.rb,
lib/google/cloud/pubsub/convert.rb,
lib/google/cloud/pubsub/message.rb,
lib/google/cloud/pubsub/project.rb,
lib/google/cloud/pubsub/service.rb,
lib/google/cloud/pubsub/version.rb,
lib/google/cloud/pubsub/snapshot.rb,
lib/google/cloud/pubsub/subscriber.rb,
lib/google/cloud/pubsub/topic/list.rb,
lib/google/cloud/pubsub/credentials.rb,
lib/google/cloud/pubsub/subscription.rb,
lib/google/cloud/pubsub/snapshot/list.rb,
lib/google/cloud/pubsub/publish_result.rb,
lib/google/cloud/pubsub/async_publisher.rb,
lib/google/cloud/pubsub/batch_publisher.rb,
lib/google/cloud/pubsub/received_message.rb,
lib/google/cloud/pubsub/subscriber/stream.rb,
lib/google/cloud/pubsub/subscription/list.rb,
lib/google/cloud/pubsub/v1/publisher_client.rb,
lib/google/cloud/pubsub/v1/subscriber_client.rb,
lib/google/cloud/pubsub/subscriber/async_pusher.rb,
lib/google/cloud/pubsub/subscriber/enumerator_queue.rb
Overview
Google Cloud Pub/Sub
Google Cloud Pub/Sub is designed to provide reliable, many-to-many, asynchronous messaging between applications. Publisher applications can send messages to a "topic" and other applications can subscribe to that topic to receive the messages. By decoupling senders and receivers, Google Cloud Pub/Sub allows developers to communicate between independently written applications.
The goal of google-cloud is to provide a API that is comfortable to Rubyists. Authentication is handled by #pubsub. You can provide the project and credential information to connect to the Pub/Sub service, or if you are running on Google Compute Engine this configuration is taken care of for you.
require "google/cloud/pubsub"
pubsub = Google::Cloud::Pubsub.new
topic = pubsub.topic "my-topic"
topic.publish "task completed"
To learn more about Pub/Sub, read the Google Cloud Pub/Sub Overview .
Retrieving Topics
A Topic is a named resource to which messages are sent by publishers. A Topic is found by its name. (See Project#topic)
require "google/cloud/pubsub"
pubsub = Google::Cloud::Pubsub.new
topic = pubsub.topic "my-topic"
Creating a Topic
A Topic is created from a Project. (See Project#create_topic)
require "google/cloud/pubsub"
pubsub = Google::Cloud::Pubsub.new
topic = pubsub.create_topic "my-topic"
Retrieving Subscriptions
A Subscription is a named resource representing the stream of messages from a single, specific Topic, to be delivered to the subscribing application. A Subscription is found by its name. (See Topic#subscription)
require "google/cloud/pubsub"
pubsub = Google::Cloud::Pubsub.new
topic = pubsub.topic "my-topic"
subscription = topic.subscription "my-topic-subscription"
puts subscription.name
Creating a Subscription
A Subscription is created from a Topic. (See Topic#subscribe and Project#subscribe)
require "google/cloud/pubsub"
pubsub = Google::Cloud::Pubsub.new
topic = pubsub.topic "my-topic"
sub = topic.subscribe "my-topic-sub"
puts sub.name # => "my-topic-sub"
The subscription can be created that specifies the number of seconds to wait to be acknowledged as well as an endpoint URL to push the messages to:
require "google/cloud/pubsub"
pubsub = Google::Cloud::Pubsub.new
topic = pubsub.topic "my-topic"
sub = topic.subscribe "my-topic-sub",
deadline: 120,
endpoint: "https://example.com/push"
Publishing Messages
Messages are published to a topic. Any message published to a topic without a subscription will be lost. Ensure the topic has a subscription before publishing. (See Topic#publish and Project#publish)
require "google/cloud/pubsub"
pubsub = Google::Cloud::Pubsub.new
topic = pubsub.topic "my-topic"
msg = topic.publish "task completed"
Messages can also be published with attributes:
require "google/cloud/pubsub"
pubsub = Google::Cloud::Pubsub.new
topic = pubsub.topic "my-topic"
msg = topic.publish "task completed",
foo: :bar,
this: :that
Messages can also be published in batches asynchronously using
publish_async
. (See Topic#publish_async and
AsyncPublisher)
require "google/cloud/pubsub"
pubsub = Google::Cloud::Pubsub.new
topic = pubsub.topic "my-topic"
topic.publish_async "task completed" do |result|
if result.succeeded?
log_publish_success result.data
else
log_publish_failure result.data, result.error
end
end
topic.async_publisher.stop.wait!
Or multiple messages can be published in batches at the same time by
passing a block to publish
. (See
BatchPublisher)
require "google/cloud/pubsub"
pubsub = Google::Cloud::Pubsub.new
topic = pubsub.topic "my-topic"
msgs = topic.publish do |batch|
batch.publish "task 1 completed", foo: :bar
batch.publish "task 2 completed", foo: :baz
batch.publish "task 3 completed", foo: :bif
end
Receiving messages
Messages can be streamed from a subscription with a subscriber object
that is created using listen
. (See
Subscription#listen and
Subscriber)
require "google/cloud/pubsub"
pubsub = Google::Cloud::Pubsub.new
sub = pubsub.subscription "my-topic-sub"
subscriber = sub.listen do ||
# process message
.acknowledge!
end
# Start background threads that will call the block passed to listen.
subscriber.start
# Shut down the subscriber when ready to stop receiving messages.
subscriber.stop.wait!
Messages also can be pulled directly in a one-time operation. (See Subscription#pull)
require "google/cloud/pubsub"
pubsub = Google::Cloud::Pubsub.new
sub = pubsub.subscription "my-topic-sub"
= sub.pull
A maximum number of messages to pull can be specified:
require "google/cloud/pubsub"
pubsub = Google::Cloud::Pubsub.new
sub = pubsub.subscription "my-topic-sub"
= sub.pull max: 10
Acknowledging a Message
Messages that are received can be acknowledged in Pub/Sub, marking the message to be removed so it cannot be pulled again.
A Message that can be acknowledged is called a ReceivedMessage. ReceivedMessages can be acknowledged one at a time: (See ReceivedMessage#acknowledge!)
require "google/cloud/pubsub"
pubsub = Google::Cloud::Pubsub.new
sub = pubsub.subscription "my-topic-sub"
subscriber = sub.listen do ||
# process message
.acknowledge!
end
# Start background threads that will call the block passed to listen.
subscriber.start
# Shut down the subscriber when ready to stop receiving messages.
subscriber.stop.wait!
Or, multiple messages can be acknowledged in a single API call: (See Subscription#acknowledge)
require "google/cloud/pubsub"
pubsub = Google::Cloud::Pubsub.new
sub = pubsub.subscription "my-topic-sub"
= sub.pull
sub.acknowledge
Modifying a Deadline
A message must be acknowledged after it is pulled, or Pub/Sub will mark the message for redelivery. The message acknowledgement deadline can delayed if more time is needed. This will allow more time to process the message before the message is marked for redelivery. (See ReceivedMessage#delay!)
require "google/cloud/pubsub"
pubsub = Google::Cloud::Pubsub.new
sub = pubsub.subscription "my-topic-sub"
subscriber = sub.listen do ||
puts ..data
# Delay for 2 minutes
.delay! 120
end
# Start background threads that will call the block passed to listen.
subscriber.start
# Shut down the subscriber when ready to stop receiving messages.
subscriber.stop.wait!
The message can also be made available for immediate redelivery:
require "google/cloud/pubsub"
pubsub = Google::Cloud::Pubsub.new
sub = pubsub.subscription "my-topic-sub"
subscriber = sub.listen do ||
puts ..data
# Mark for redelivery
.reject!
end
# Start background threads that will call the block passed to listen.
subscriber.start
# Shut down the subscriber when ready to stop receiving messages.
subscriber.stop.wait!
Multiple messages can be delayed or made available for immediate redelivery: (See Subscription#delay)
require "google/cloud/pubsub"
pubsub = Google::Cloud::Pubsub.new
sub = pubsub.subscription "my-topic-sub"
= sub.pull
sub.delay 120,
Creating a snapshot and using seek
You can create a snapshot to retain the existing backlog on a
subscription. The snapshot will hold the messages in the subscription's
backlog that are unacknowledged upon the successful completion of the
create_snapshot
operation.
Later, you can use seek
to reset the subscription's backlog to the
snapshot.
(See Subscription#create_snapshot and Subscription#seek)
require "google/cloud/pubsub"
pubsub = Google::Cloud::Pubsub.new
sub = pubsub.subscription "my-topic-sub"
snapshot = sub.create_snapshot
= sub.pull
sub.acknowledge
sub.seek snapshot
Listening for Messages
A subscriber object can be created using listen
, which streams messages
from the backend and processes them as they are received. (See
Subscription#listen and
Subscriber)
require "google/cloud/pubsub"
pubsub = Google::Cloud::Pubsub.new
sub = pubsub.subscription "my-topic-sub"
subscriber = sub.listen do ||
# process message
.acknowledge!
end
# Start background threads that will call the block passed to listen.
subscriber.start
# Shut down the subscriber when ready to stop receiving messages.
subscriber.stop.wait!
The subscriber object can be configured to control the number of concurrent streams to open, the number of received messages to be collected, and the number of threads each stream opens for concurrent calls made to handle the received messages.
require "google/cloud/pubsub"
pubsub = Google::Cloud::Pubsub.new
sub = pubsub.subscription "my-topic-sub"
subscriber = sub.listen threads: { callback: 16 } do ||
# store the message somewhere before acknowledging
store_in_backend .data # takes a few seconds
.acknowledge!
end
# Start background threads that will call the block passed to listen.
subscriber.start
Working Across Projects
All calls to the Pub/Sub service use the same project and credentials
provided to the #pubsub method. However, it is common to
reference topics or subscriptions in other projects, which can be achieved
by using the project
option. The main credentials must have permissions
to the topics and subscriptions in other projects.
require "google/cloud/pubsub"
pubsub = Google::Cloud::Pubsub.new # my-project-id
# Get a topic in the current project
my_topic = pubsub.topic "my-topic"
my_topic.name #=> "projects/my-project-id/topics/my-topic"
# Get a topic in another project
other_topic = pubsub.topic "other-topic", project: "other-project-id"
other_topic.name #=> "projects/other-project-id/topics/other-topic"
It is possible to create a subscription in the current project that pulls from a topic in another project:
require "google/cloud/pubsub"
pubsub = Google::Cloud::Pubsub.new # my-project-id
# Get a topic in another project
topic = pubsub.topic "other-topic", project: "other-project-id"
# Create a subscription in the current project that pulls from
# the topic in another project
sub = topic.subscribe "my-sub"
sub.name #=> "projects/my-project-id/subscriptions/my-sub"
sub.topic.name #=> "projects/other-project-id/topics/other-topic"
Using the Google Cloud Pub/Sub Emulator
To develop and test your application locally, you can use the Google
Cloud Pub/Sub Emulator, which
provides local
emulation
of the production Google Cloud Pub/Sub environment. You can start the
Google Cloud Pub/Sub emulator using the gcloud
command-line tool.
To configure your ruby code to use the emulator, set the
PUBSUB_EMULATOR_HOST
environment variable to the host and port where the
emulator is running. The value can be set as an environment variable in
the shell running the ruby code, or can be set directly in the ruby code
as shown below.
require "google/cloud/pubsub"
# Make Pub/Sub use the emulator
ENV["PUBSUB_EMULATOR_HOST"] = "localhost:8918"
pubsub = Google::Cloud::Pubsub.new "emulator-project-id"
# Get a topic in the current project
my_topic = pubsub.new_topic "my-topic"
my_topic.name #=> "projects/emulator-project-id/topics/my-topic"
Defined Under Namespace
Modules: V1 Classes: AsyncPublisher, BatchPublisher, Message, Policy, Project, PublishResult, ReceivedMessage, Snapshot, Subscriber, Subscription, Topic
Constant Summary collapse
- VERSION =
"0.27.2"
Class Method Summary collapse
-
.new(project: nil, keyfile: nil, scope: nil, timeout: nil, client_config: nil, emulator_host: nil) ⇒ Google::Cloud::Pubsub::Project
Creates a new object for connecting to the Pub/Sub service.
Class Method Details
.new(project: nil, keyfile: nil, scope: nil, timeout: nil, client_config: nil, emulator_host: nil) ⇒ Google::Cloud::Pubsub::Project
Creates a new object for connecting to the Pub/Sub service. Each call creates a new connection.
For more information on connecting to Google Cloud see the Authentication Guide.
520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 |
# File 'lib/google/cloud/pubsub.rb', line 520 def self.new project: nil, keyfile: nil, scope: nil, timeout: nil, client_config: nil, emulator_host: nil project ||= Google::Cloud::Pubsub::Project.default_project project = project.to_s # Always cast to a string fail ArgumentError, "project is missing" if project.empty? emulator_host ||= ENV["PUBSUB_EMULATOR_HOST"] if emulator_host ps = Google::Cloud::Pubsub::Project.new( Google::Cloud::Pubsub::Service.new( project, :this_channel_is_insecure)) ps.service.host = emulator_host return ps end credentials = credentials_with_scope keyfile, scope Google::Cloud::Pubsub::Project.new( Google::Cloud::Pubsub::Service.new( project, credentials, timeout: timeout, client_config: client_config)) end |