Module: Google::Cloud::Pubsub

Defined in:
lib/google/cloud/pubsub.rb,
lib/google/cloud/pubsub/topic.rb,
lib/google/cloud/pubsub/policy.rb,
lib/google/cloud/pubsub/convert.rb,
lib/google/cloud/pubsub/message.rb,
lib/google/cloud/pubsub/project.rb,
lib/google/cloud/pubsub/service.rb,
lib/google/cloud/pubsub/version.rb,
lib/google/cloud/pubsub/snapshot.rb,
lib/google/cloud/pubsub/subscriber.rb,
lib/google/cloud/pubsub/topic/list.rb,
lib/google/cloud/pubsub/credentials.rb,
lib/google/cloud/pubsub/subscription.rb,
lib/google/cloud/pubsub/snapshot/list.rb,
lib/google/cloud/pubsub/publish_result.rb,
lib/google/cloud/pubsub/v1/credentials.rb,
lib/google/cloud/pubsub/async_publisher.rb,
lib/google/cloud/pubsub/batch_publisher.rb,
lib/google/cloud/pubsub/received_message.rb,
lib/google/cloud/pubsub/subscriber/stream.rb,
lib/google/cloud/pubsub/subscription/list.rb,
lib/google/cloud/pubsub/v1/publisher_client.rb,
lib/google/cloud/pubsub/subscriber/inventory.rb,
lib/google/cloud/pubsub/v1/subscriber_client.rb,
lib/google/cloud/pubsub/subscriber/enumerator_queue.rb,
lib/google/cloud/pubsub/subscriber/async_unary_pusher.rb,
lib/google/cloud/pubsub/subscriber/async_stream_pusher.rb

Overview

Google Cloud Pub/Sub

Google Cloud Pub/Sub is designed to provide reliable, many-to-many, asynchronous messaging between applications. Publisher applications can send messages to a "topic" and other applications can subscribe to that topic to receive the messages. By decoupling senders and receivers, Google Cloud Pub/Sub allows developers to communicate between independently written applications.

The goal of google-cloud is to provide an API that is comfortable to Rubyists. Your authentication credentials are detected automatically in Google Cloud Platform environments such as Google Compute Engine, Google App Engine and Google Kubernetes Engine. In other environments you can configure authentication easily, either directly in your code or via environment variables. Read more about the options for connecting in the Authentication Guide.

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

topic = pubsub.topic "my-topic"
topic.publish "task completed"

To learn more about Pub/Sub, read the Google Cloud Pub/Sub Overview .

Enabling Logging

To enable logging for this library, set the logger for the underlying gRPC library. The logger that you set may be a Ruby stdlib Logger as shown below, or a Google::Cloud::Logging::Logger that will write logs to Stackdriver Logging. See grpc/logconfig.rb and the gRPC spec_helper.rb for additional information.

Configuring a Ruby stdlib logger:

require "logger"

module MyLogger
  LOGGER = Logger.new $stderr, level: Logger::WARN
  def logger
    LOGGER
  end
end

# Define a gRPC module-level logger method before grpc/logconfig.rb loads.
module GRPC
  extend MyLogger
end

Retrieving Topics

A Topic is a named resource to which messages are sent by publishers. A Topic is found by its name. (See Project#topic)

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new
topic = pubsub.topic "my-topic"

Creating a Topic

A Topic is created from a Project. (See Project#create_topic)

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new
topic = pubsub.create_topic "my-topic"

Retrieving Subscriptions

A Subscription is a named resource representing the stream of messages from a single, specific Topic, to be delivered to the subscribing application. A Subscription is found by its name. (See Topic#subscription)

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

topic = pubsub.topic "my-topic"
subscription = topic.subscription "my-topic-subscription"
puts subscription.name

Creating a Subscription

A Subscription is created from a Topic. (See Topic#subscribe and Project#subscribe)

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

topic = pubsub.topic "my-topic"
sub = topic.subscribe "my-topic-sub"
puts sub.name # => "my-topic-sub"

The subscription can be created that specifies the number of seconds to wait to be acknowledged as well as an endpoint URL to push the messages to:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

topic = pubsub.topic "my-topic"
sub = topic.subscribe "my-topic-sub",
                      deadline: 120,
                      endpoint: "https://example.com/push"

Publishing Messages

Messages are published to a topic. Any message published to a topic without a subscription will be lost. Ensure the topic has a subscription before publishing. (See Topic#publish and Project#publish)

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

topic = pubsub.topic "my-topic"
msg = topic.publish "task completed"

Messages can also be published with attributes:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

topic = pubsub.topic "my-topic"
msg = topic.publish "task completed",
                    foo: :bar,
                    this: :that

Messages can also be published in batches asynchronously using publish_async. (See Topic#publish_async and AsyncPublisher)

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

topic = pubsub.topic "my-topic"
topic.publish_async "task completed" do |result|
  if result.succeeded?
    log_publish_success result.data
  else
    log_publish_failure result.data, result.error
  end
end

topic.async_publisher.stop.wait!

Or multiple messages can be published in batches at the same time by passing a block to publish. (See BatchPublisher)

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

topic = pubsub.topic "my-topic"
msgs = topic.publish do |batch|
  batch.publish "task 1 completed", foo: :bar
  batch.publish "task 2 completed", foo: :baz
  batch.publish "task 3 completed", foo: :bif
end

Receiving messages

Messages can be streamed from a subscription with a subscriber object that is created using listen. (See Subscription#listen and Subscriber)

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

sub = pubsub.subscription "my-topic-sub"

subscriber = sub.listen do |received_message|
  # process message
  received_message.acknowledge!
end

# Start background threads that will call the block passed to listen.
subscriber.start

# Shut down the subscriber when ready to stop receiving messages.
subscriber.stop.wait!

Messages also can be pulled directly in a one-time operation. (See Subscription#pull)

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

sub = pubsub.subscription "my-topic-sub"
received_messages = sub.pull

A maximum number of messages to pull can be specified:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

sub = pubsub.subscription "my-topic-sub"
received_messages = sub.pull max: 10

Acknowledging a Message

Messages that are received can be acknowledged in Pub/Sub, marking the message to be removed so it cannot be pulled again.

A Message that can be acknowledged is called a ReceivedMessage. ReceivedMessages can be acknowledged one at a time: (See ReceivedMessage#acknowledge!)

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

sub = pubsub.subscription "my-topic-sub"

subscriber = sub.listen do |received_message|
  # process message
  received_message.acknowledge!
end

# Start background threads that will call the block passed to listen.
subscriber.start

# Shut down the subscriber when ready to stop receiving messages.
subscriber.stop.wait!

Or, multiple messages can be acknowledged in a single API call: (See Subscription#acknowledge)

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

sub = pubsub.subscription "my-topic-sub"
received_messages = sub.pull
sub.acknowledge received_messages

Modifying a Deadline

A message must be acknowledged after it is pulled, or Pub/Sub will mark the message for redelivery. The message acknowledgement deadline can delayed if more time is needed. This will allow more time to process the message before the message is marked for redelivery. (See ReceivedMessage#delay!)

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

sub = pubsub.subscription "my-topic-sub"
subscriber = sub.listen do |received_message|
  puts received_message.message.data

  # Delay for 2 minutes
  received_message.delay! 120
end

# Start background threads that will call the block passed to listen.
subscriber.start

# Shut down the subscriber when ready to stop receiving messages.
subscriber.stop.wait!

The message can also be made available for immediate redelivery:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

sub = pubsub.subscription "my-topic-sub"
subscriber = sub.listen do |received_message|
  puts received_message.message.data

  # Mark for redelivery
  received_message.reject!
end

# Start background threads that will call the block passed to listen.
subscriber.start

# Shut down the subscriber when ready to stop receiving messages.
subscriber.stop.wait!

Multiple messages can be delayed or made available for immediate redelivery: (See Subscription#delay)

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

sub = pubsub.subscription "my-topic-sub"
received_messages = sub.pull
sub.delay 120, received_messages

Creating a snapshot and using seek

You can create a snapshot to retain the existing backlog on a subscription. The snapshot will hold the messages in the subscription's backlog that are unacknowledged upon the successful completion of the create_snapshot operation.

Later, you can use seek to reset the subscription's backlog to the snapshot.

(See Subscription#create_snapshot and Subscription#seek)

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

sub = pubsub.subscription "my-topic-sub"

snapshot = sub.create_snapshot

received_messages = sub.pull
sub.acknowledge received_messages

sub.seek snapshot

Listening for Messages

A subscriber object can be created using listen, which streams messages from the backend and processes them as they are received. (See Subscription#listen and Subscriber)

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

sub = pubsub.subscription "my-topic-sub"

subscriber = sub.listen do |received_message|
  # process message
  received_message.acknowledge!
end

# Start background threads that will call the block passed to listen.
subscriber.start

# Shut down the subscriber when ready to stop receiving messages.
subscriber.stop.wait!

The subscriber object can be configured to control the number of concurrent streams to open, the number of received messages to be collected, and the number of threads each stream opens for concurrent calls made to handle the received messages.

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

sub = pubsub.subscription "my-topic-sub"

subscriber = sub.listen threads: { callback: 16 } do |received_message|
  # store the message somewhere before acknowledging
  store_in_backend received_message.data # takes a few seconds
  received_message.acknowledge!
end

# Start background threads that will call the block passed to listen.
subscriber.start

Working Across Projects

All calls to the Pub/Sub service use the same project and credentials provided to the #pubsub method. However, it is common to reference topics or subscriptions in other projects, which can be achieved by using the project option. The main credentials must have permissions to the topics and subscriptions in other projects.

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new # my-project

# Get a topic in the current project
my_topic = pubsub.topic "my-topic"
my_topic.name #=> "projects/my-project/topics/my-topic"
# Get a topic in another project
other_topic = pubsub.topic "other-topic", project: "other-project-id"
other_topic.name #=> "projects/other-project-id/topics/other-topic"

It is possible to create a subscription in the current project that pulls from a topic in another project:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new # my-project

# Get a topic in another project
topic = pubsub.topic "other-topic", project: "other-project-id"
# Create a subscription in the current project that pulls from
# the topic in another project
sub = topic.subscribe "my-sub"
sub.name #=> "projects/my-project/subscriptions/my-sub"
sub.topic.name #=> "projects/other-project-id/topics/other-topic"

Using the Google Cloud Pub/Sub Emulator

To develop and test your application locally, you can use the Google Cloud Pub/Sub Emulator, which provides local emulation of the production Google Cloud Pub/Sub environment. You can start the Google Cloud Pub/Sub emulator using the gcloud command-line tool.

To configure your ruby code to use the emulator, set the PUBSUB_EMULATOR_HOST environment variable to the host and port where the emulator is running. The value can be set as an environment variable in the shell running the ruby code, or can be set directly in the ruby code as shown below.

require "google/cloud/pubsub"

# Make Pub/Sub use the emulator
ENV["PUBSUB_EMULATOR_HOST"] = "localhost:8918"

pubsub = Google::Cloud::Pubsub.new "emulator-project-id"

# Get a topic in the current project
my_topic = pubsub.new_topic "my-topic"
my_topic.name #=> "projects/emulator-project-id/topics/my-topic"

Defined Under Namespace

Modules: V1 Classes: AsyncPublisher, BatchPublisher, Credentials, Message, Policy, Project, PublishResult, ReceivedMessage, Snapshot, Subscriber, Subscription, Topic

Constant Summary collapse

VERSION =
"0.32.0".freeze

Class Method Summary collapse

Class Method Details

.configure {|Google::Cloud.configure.pubsub| ... } ⇒ Google::Cloud::Config

Configure the Google Cloud Pubsub library.

The following Pubsub configuration parameters are supported:

  • project_id - (String) Identifier for a Pubsub project. (The parameter project is considered deprecated, but may also be used.)
  • credentials - (String, Hash, Google::Auth::Credentials) The path to the keyfile as a String, the contents of the keyfile as a Hash, or a Google::Auth::Credentials object. (See Credentials) (The parameter keyfile is considered deprecated, but may also be used.)
  • scope - (String, Array) The OAuth 2.0 scopes controlling the set of resources and operations that the connection can access.
  • retries - (Integer) Number of times to retry requests on server error.
  • timeout - (Integer) Default timeout to use in requests.
  • client_config - (Hash) A hash of values to override the default behavior of the API client.
  • emulator_host - (String) Host name of the emulator. Defaults to ENV["PUBSUB_EMULATOR_HOST"]

Yields:

Returns:

  • (Google::Cloud::Config)

    The configuration object the Google::Cloud::Pubsub library uses.



620
621
622
623
624
# File 'lib/google/cloud/pubsub.rb', line 620

def self.configure
  yield Google::Cloud.configure.pubsub if block_given?

  Google::Cloud.configure.pubsub
end

.new(project_id: nil, credentials: nil, scope: nil, timeout: nil, client_config: nil, emulator_host: nil, project: nil, keyfile: nil) ⇒ Google::Cloud::Pubsub::Project

Creates a new object for connecting to the Pub/Sub service. Each call creates a new connection.

For more information on connecting to Google Cloud see the Authentication Guide.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud.pubsub

topic = pubsub.topic "my-topic"
topic.publish "task completed"

Parameters:

  • project_id (String)

    Project identifier for the Pub/Sub service you are connecting to. If not present, the default project for the credentials is used.

  • credentials (String, Hash, Google::Auth::Credentials)

    The path to the keyfile as a String, the contents of the keyfile as a Hash, or a Google::Auth::Credentials object. (See Credentials)

  • scope (String, Array<String>)

    The OAuth 2.0 scopes controlling the set of resources and operations that the connection can access. See Using OAuth 2.0 to Access Google APIs.

    The default scope is:

    • https://www.googleapis.com/auth/pubsub
  • timeout (Integer)

    Default timeout to use in requests. Optional.

  • client_config (Hash)

    A hash of values to override the default behavior of the API client. Optional.

  • emulator_host (String)

    Pub/Sub emulator host. Optional. If the param is nil, uses the value of the emulator_host config.

  • project (String)

    Alias for the project_id argument. Deprecated.

  • keyfile (String)

    Alias for the credentials argument. Deprecated.

Returns:

Raises:

  • (ArgumentError)


563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
# File 'lib/google/cloud/pubsub.rb', line 563

def self.new project_id: nil, credentials: nil, scope: nil, timeout: nil,
             client_config: nil, emulator_host: nil, project: nil,
             keyfile: nil
  project_id ||= (project || default_project_id)
  project_id = project_id.to_s # Always cast to a string
  raise ArgumentError, "project_id is missing" if project_id.empty?

  scope ||= configure.scope
  timeout ||= configure.timeout
  client_config ||= configure.client_config
  emulator_host ||= configure.emulator_host
  if emulator_host
    return Pubsub::Project.new(
      Pubsub::Service.new(
        project_id, :this_channel_is_insecure,
        host: emulator_host
      )
    )
  end

  credentials ||= (keyfile || default_credentials(scope: scope))
  unless credentials.is_a? Google::Auth::Credentials
    credentials = Pubsub::Credentials.new credentials, scope: scope
  end

  Pubsub::Project.new(
    Pubsub::Service.new(
      project_id, credentials, timeout: timeout,
                               client_config: client_config
    )
  )
end