Module: Google::Cloud::Pubsub

Defined in:
lib/google/cloud/pubsub.rb,
lib/google/cloud/pubsub/topic.rb,
lib/google/cloud/pubsub/policy.rb,
lib/google/cloud/pubsub/message.rb,
lib/google/cloud/pubsub/project.rb,
lib/google/cloud/pubsub/service.rb,
lib/google/cloud/pubsub/version.rb,
lib/google/cloud/pubsub/topic/list.rb,
lib/google/cloud/pubsub/credentials.rb,
lib/google/cloud/pubsub/subscription.rb,
lib/google/cloud/pubsub/topic/publisher.rb,
lib/google/cloud/pubsub/received_message.rb,
lib/google/cloud/pubsub/subscription/list.rb,
lib/google/cloud/pubsub/v1/publisher_client.rb,
lib/google/cloud/pubsub/v1/subscriber_client.rb

Overview

Google Cloud Pub/Sub

Google Cloud Pub/Sub is designed to provide reliable, many-to-many, asynchronous messaging between applications. Publisher applications can send messages to a "topic" and other applications can subscribe to that topic to receive the messages. By decoupling senders and receivers, Google Cloud Pub/Sub allows developers to communicate between independently written applications.

The goal of google-cloud is to provide a API that is comfortable to Rubyists. Authentication is handled by #pubsub. You can provide the project and credential information to connect to the Pub/Sub service, or if you are running on Google Compute Engine this configuration is taken care of for you.

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

topic = pubsub.topic "my-topic"
topic.publish "task completed"

To learn more about Pub/Sub, read the Google Cloud Pub/Sub Overview .

Retrieving Topics

A Topic is a named resource to which messages are sent by publishers. A Topic is found by its name. (See Project#topic)

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new
topic = pubsub.topic "my-topic"

Creating a Topic

A Topic is created from a Project. (See Project#create_topic)

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new
topic = pubsub.create_topic "my-topic"

Retrieving Subscriptions

A Subscription is a named resource representing the stream of messages from a single, specific Topic, to be delivered to the subscribing application. A Subscription is found by its name. (See Topic#subscription)

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

topic = pubsub.topic "my-topic"
subscription = topic.subscription "my-topic-subscription"
puts subscription.name

Creating a Subscription

A Subscription is created from a Topic. (See Topic#subscribe and Project#subscribe)

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

topic = pubsub.topic "my-topic"
sub = topic.subscribe "my-topic-sub"
puts sub.name # => "my-topic-sub"

The subscription can be created that specifies the number of seconds to wait to be acknowledged as well as an endpoint URL to push the messages to:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

topic = pubsub.topic "my-topic"
sub = topic.subscribe "my-topic-sub",
                      deadline: 120,
                      endpoint: "https://example.com/push"

Publishing Messages

Messages are published to a topic. Any message published to a topic without a subscription will be lost. Ensure the topic has a subscription before publishing. (See Topic#publish and Project#publish)

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

topic = pubsub.topic "my-topic"
msg = topic.publish "new-message"

Messages can also be published with attributes:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

topic = pubsub.topic "my-topic"
msg = topic.publish "new-message",
                    foo: :bar,
                    this: :that

Multiple messages can be published at the same time by passing a block:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

topic = pubsub.topic "my-topic"
msgs = topic.publish do |batch|
  batch.publish "new-message-1", foo: :bar
  batch.publish "new-message-2", foo: :baz
  batch.publish "new-message-3", foo: :bif
end

Pulling Messages

Messages are pulled from a Subscription. (See Subscription#pull)

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

sub = pubsub.subscription "my-topic-sub"
msgs = sub.pull

A maximum number of messages returned can also be specified:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

sub = pubsub.subscription "my-topic-sub", max: 10
msgs = sub.pull

The request for messages can also block until messages are available. (See Subscription#wait_for_messages)

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

sub = pubsub.subscription "my-topic-sub"
msgs = sub.wait_for_messages

Acknowledging a Message

Messages that are received can be acknowledged in Pub/Sub, marking the message to be removed so it cannot be pulled again.

A Message that can be acknowledged is called a ReceivedMessage. ReceivedMessages can be acknowledged one at a time: (See ReceivedMessage#acknowledge!)

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

sub = pubsub.subscription "my-topic-sub"
sub.pull.each { |msg| msg.acknowledge! }

Or, multiple messages can be acknowledged in a single API call: (See Subscription#acknowledge)

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

sub = pubsub.subscription "my-topic-sub"
received_messages = sub.pull
sub.acknowledge received_messages

Modifying a Deadline

A message must be acknowledged after it is pulled, or Pub/Sub will mark the message for redelivery. The message acknowledgement deadline can delayed if more time is needed. This will allow more time to process the message before the message is marked for redelivery. (See ReceivedMessage#delay!)

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

sub = pubsub.subscription "my-topic-sub"
received_message = sub.pull.first
if received_message
  puts received_message.message.data
  # Delay for 2 minutes
  received_message.delay! 120
end

The message can also be made available for immediate redelivery:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

sub = pubsub.subscription "my-topic-sub"
received_message = sub.pull.first
if received_message
  puts received_message.message.data
  # Mark for redelivery by setting the deadline to now
  received_message.delay! 0
end

Multiple messages can be delayed or made available for immediate redelivery: (See Subscription#delay)

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

sub = pubsub.subscription "my-topic-sub"
received_messages = sub.pull
sub.delay 120, received_messages

Listening for Messages

Long running workers are easy to create with listen, which runs an infinitely blocking loop to process messages as they are received. (See Subscription#listen)

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

sub = pubsub.subscription "my-topic-sub"
sub.listen do |msg|
  # process msg
end

Messages are retrieved in batches for efficiency. The number of messages pulled per batch can be limited with the max option:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

sub = pubsub.subscription "my-topic-sub"
sub.listen max: 20 do |msg|
  # process msg
end

When processing time and the acknowledgement deadline are a concern, messages can be automatically acknowledged as they are pulled with the autoack option:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

sub = pubsub.subscription "my-topic-sub"
sub.listen autoack: true do |msg|
  # process msg
end

Configuring timeout

You can configure the request timeout value in seconds.

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new timeout: 120

Working Across Projects

All calls to the Pub/Sub service use the same project and credentials provided to the #pubsub method. However, it is common to reference topics or subscriptions in other projects, which can be achieved by using the project option. The main credentials must have permissions to the topics and subscriptions in other projects.

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new # my-project-id

# Get a topic in the current project
my_topic = pubsub.topic "my-topic"
my_topic.name #=> "projects/my-project-id/topics/my-topic"
# Get a topic in another project
other_topic = pubsub.topic "other-topic", project: "other-project-id"
other_topic.name #=> "projects/other-project-id/topics/other-topic"

It is possible to create a subscription in the current project that pulls from a topic in another project:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new # my-project-id

# Get a topic in another project
topic = pubsub.topic "other-topic", project: "other-project-id"
# Create a subscription in the current project that pulls from
# the topic in another project
sub = topic.subscribe "my-sub"
sub.name #=> "projects/my-project-id/subscriptions/my-sub"
sub.topic.name #=> "projects/other-project-id/topics/other-topic"

Using the Google Cloud Pub/Sub Emulator

To develop and test your application locally, you can use the Google Cloud Pub/Sub Emulator, which provides local emulation of the production Google Cloud Pub/Sub environment. You can start the Google Cloud Pub/Sub emulator using the gcloud command-line tool.

To configure your ruby code to use the emulator, set the PUBSUB_EMULATOR_HOST environment variable to the host and port where the emulator is running. The value can be set as an environment variable in the shell running the ruby code, or can be set directly in the ruby code as shown below.

require "google/cloud/pubsub"

# Make Pub/Sub use the emulator
ENV["PUBSUB_EMULATOR_HOST"] = "localhost:8918"

pubsub = Google::Cloud::Pubsub.new "emulator-project-id"

# Get a topic in the current project
my_topic = pubsub.new_topic "my-topic"
my_topic.name #=> "projects/emulator-project-id/topics/my-topic"

Defined Under Namespace

Modules: V1 Classes: Message, Policy, Project, ReceivedMessage, Subscription, Topic

Constant Summary collapse

VERSION =
"0.22.0"

Class Method Summary collapse

Class Method Details

.new(project: nil, keyfile: nil, scope: nil, timeout: nil, client_config: nil) ⇒ Google::Cloud::Pubsub::Project

Creates a new object for connecting to the Pub/Sub service. Each call creates a new connection.

For more information on connecting to Google Cloud see the Authentication Guide.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud.pubsub

topic = pubsub.topic "my-topic"
topic.publish "task completed"

Parameters:

  • project (String)

    Project identifier for the Pub/Sub service you are connecting to.

  • keyfile (String, Hash)

    Keyfile downloaded from Google Cloud. If file path the file must be readable.

  • scope (String, Array<String>)

    The OAuth 2.0 scopes controlling the set of resources and operations that the connection can access. See Using OAuth 2.0 to Access Google APIs.

    The default scope is:

    • https://www.googleapis.com/auth/pubsub
  • timeout (Integer)

    Default timeout to use in requests. Optional.

  • client_config (Hash)

    A hash of values to override the default behavior of the API client. Optional.

Returns:



441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
# File 'lib/google/cloud/pubsub.rb', line 441

def self.new project: nil, keyfile: nil, scope: nil, timeout: nil,
             client_config: nil
  project ||= Google::Cloud::Pubsub::Project.default_project
  project = project.to_s # Always cast to a string
  if ENV["PUBSUB_EMULATOR_HOST"]
    ps = Google::Cloud::Pubsub::Project.new(
      Google::Cloud::Pubsub::Service.new(
        project, :this_channel_is_insecure))
    ps.service.host = ENV["PUBSUB_EMULATOR_HOST"]
    return ps
  end
  if keyfile.nil?
    credentials = Google::Cloud::Pubsub::Credentials.default scope: scope
  else
    credentials = Google::Cloud::Pubsub::Credentials.new \
      keyfile, scope: scope
  end
  Google::Cloud::Pubsub::Project.new(
    Google::Cloud::Pubsub::Service.new(
      project, credentials, timeout: timeout,
                            client_config: client_config))
end