Class: Google::Cloud::Pubsub::Topic

Inherits:
Object
  • Object
show all
Defined in:
lib/google/cloud/pubsub/topic.rb,
lib/google/cloud/pubsub/topic/list.rb

Overview

Topic

A named resource to which messages are published.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

topic = pubsub.topic "my-topic"
topic.publish "task completed"

Defined Under Namespace

Classes: List

Instance Method Summary collapse

Instance Method Details

#async_publisherAsyncPublisher

AsyncPublisher object used to publish multiple messages in batches.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

topic = pubsub.topic "my-topic"
topic.publish_async "task completed" do |result|
  if result.succeeded?
    log_publish_success result.data
  else
    log_publish_failure result.data, result.error
  end
end

topic.async_publisher.stop.wait!

Returns:



80
81
82
# File 'lib/google/cloud/pubsub/topic.rb', line 80

def async_publisher
  @async_publisher
end

#deleteBoolean

Permanently deletes the topic.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

topic = pubsub.topic "my-topic"
topic.delete

Returns:

  • (Boolean)

    Returns true if the topic was deleted.



104
105
106
107
108
# File 'lib/google/cloud/pubsub/topic.rb', line 104

def delete
  ensure_service!
  service.delete_topic name
  true
end

#exists?Boolean

Determines whether the topic exists in the Pub/Sub service.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

topic = pubsub.topic "my-topic"
topic.exists? #=> true

Returns:

  • (Boolean)


506
507
508
509
510
511
512
513
514
515
# File 'lib/google/cloud/pubsub/topic.rb', line 506

def exists?
  # Always true if the object is not set as lazy
  return true unless lazy?
  # If we have a value, return it
  return @exists unless @exists.nil?
  ensure_grpc!
  @exists = true
rescue Google::Cloud::NotFoundError
  @exists = false
end

#nameObject

The name of the topic in the form of "/projects/project-identifier/topics/topic-name".



87
88
89
# File 'lib/google/cloud/pubsub/topic.rb', line 87

def name
  @grpc.name
end

#policy {|policy| ... } ⇒ Policy

Gets the Cloud IAM access control policy for this topic.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new
topic = pubsub.topic "my-topic"

policy = topic.policy

Update the policy by passing a block:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new
topic = pubsub.topic "my-topic"

topic.policy do |p|
  p.add "roles/owner", "user:owner@example.com"
end

Yields:

  • (policy)

    A block for updating the policy. The latest policy will be read from the Pub/Sub service and passed to the block. After the block completes, the modified policy will be written to the service.

Yield Parameters:

  • policy (Policy)

    the current Cloud IAM Policy for this topic

Returns:

  • (Policy)

    the current Cloud IAM Policy for this topic

See Also:



409
410
411
412
413
414
415
416
# File 'lib/google/cloud/pubsub/topic.rb', line 409

def policy
  ensure_service!
  grpc = service.get_topic_policy name
  policy = Policy.from_grpc grpc
  return policy unless block_given?
  yield policy
  update_policy policy
end

#publish(data = nil, attributes = {}) {|batch| ... } ⇒ Message+

Publishes one or more messages to the topic.

The message payload must not be empty; it must contain either a non-empty data field, or at least one attribute.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

topic = pubsub.topic "my-topic"
msg = topic.publish "task completed"

A message can be published using a File object:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

topic = pubsub.topic "my-topic"
file = File.open "message.txt", mode: "rb"
msg = topic.publish file

Additionally, a message can be published with attributes:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

topic = pubsub.topic "my-topic"
msg = topic.publish "task completed",
                    foo: :bar,
                    this: :that

Multiple messages can be sent at the same time using a block:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

topic = pubsub.topic "my-topic"
msgs = topic.publish do |t|
  t.publish "task 1 completed", foo: :bar
  t.publish "task 2 completed", foo: :baz
  t.publish "task 3 completed", foo: :bif
end

Parameters:

  • data (String, File) (defaults to: nil)

    The message payload. This will be converted to bytes encoded as ASCII-8BIT.

  • attributes (Hash) (defaults to: {})

    Optional attributes for the message.

Yields:

  • (batch)

    a block for publishing multiple messages in one request

Yield Parameters:

Returns:

  • (Message, Array<Message>)

    Returns the published message when called without a block, or an array of messages when called with a block.



309
310
311
312
313
314
315
# File 'lib/google/cloud/pubsub/topic.rb', line 309

def publish data = nil, attributes = {}
  ensure_service!
  batch = BatchPublisher.new data, attributes
  yield batch if block_given?
  return nil if batch.messages.count.zero?
  publish_batch_messages batch
end

#publish_async(data = nil, attributes = {}) {|result| ... } ⇒ Object

Publishes a message asynchonously to the topic.

The message payload must not be empty; it must contain either a non-empty data field, or at least one attribute.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

topic = pubsub.topic "my-topic"
topic.publish_async "task completed" do |result|
  if result.succeeded?
    log_publish_success result.data
  else
    log_publish_failure result.data, result.error
  end
end

topic.async_publisher.stop.wait!

A message can be published using a File object:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

topic = pubsub.topic "my-topic"
file = File.open "message.txt", mode: "rb"
topic.publish_async file

topic.async_publisher.stop.wait!

Additionally, a message can be published with attributes:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

topic = pubsub.topic "my-topic"
topic.publish_async "task completed",
                    foo: :bar, this: :that

topic.async_publisher.stop.wait!

Parameters:

  • data (String, File) (defaults to: nil)

    The message payload. This will be converted to bytes encoded as ASCII-8BIT.

  • attributes (Hash) (defaults to: {})

    Optional attributes for the message.

Yields:

  • (result)

    the callback for when the message has been published

Yield Parameters:

  • result (PublishResult)

    the result of the asynchonous publish



368
369
370
371
372
373
# File 'lib/google/cloud/pubsub/topic.rb', line 368

def publish_async data = nil, attributes = {}, &block
  ensure_service!

  @async_publisher ||= AsyncPublisher.new(name, service, @async_opts)
  @async_publisher.publish data, attributes, &block
end

#subscribe(subscription_name, deadline: nil, retain_acked: false, retention: nil, endpoint: nil) ⇒ Google::Cloud::Pubsub::Subscription Also known as: create_subscription, new_subscription

Creates a new Subscription object on the current Topic.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

topic = pubsub.topic "my-topic"
sub = topic.subscribe "my-topic-sub"
sub.name # => "my-topic-sub"

Wait 2 minutes for acknowledgement and push all to endpoint:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

topic = pubsub.topic "my-topic"
sub = topic.subscribe "my-topic-sub",
                      deadline: 120,
                      endpoint: "https://example.com/push"

Parameters:

  • subscription_name (String)

    Name of the new subscription. Must start with a letter, and contain only letters ([A-Za-z]), numbers ([0-9], dashes (-), underscores (_), periods (.), tildes (~), plus (+) or percent signs (%). It must be between 3 and 255 characters in length, and it must not start with "goog". Required.

  • deadline (Integer)

    The maximum number of seconds after a subscriber receives a message before the subscriber should acknowledge the message.

  • retain_acked (Boolean)

    Indicates whether to retain acknowledged messages. If true, then messages are not expunged from the subscription's backlog, even if they are acknowledged, until they fall out of the retention_duration window. Default is false.

  • retention (Numeric)

    How long to retain unacknowledged messages in the subscription's backlog, from the moment a message is published. If retain_acked is true, then this also configures the retention of acknowledged messages, and thus configures how far back in time a #seek can be done. Cannot be more than 604,800 seconds (7 days) or less than 600 seconds (10 minutes). Default is 604,800 seconds (7 days).

  • endpoint (String)

    A URL locating the endpoint to which messages should be pushed.

Returns:



156
157
158
159
160
161
162
163
# File 'lib/google/cloud/pubsub/topic.rb', line 156

def subscribe subscription_name, deadline: nil, retain_acked: false,
              retention: nil, endpoint: nil
  ensure_service!
  options = { deadline: deadline, retain_acked: retain_acked,
              retention: retention, endpoint: endpoint }
  grpc = service.create_subscription name, subscription_name, options
  Subscription.from_grpc grpc, service
end

#subscription(subscription_name, skip_lookup: nil) ⇒ Google::Cloud::Pubsub::Subscription? Also known as: get_subscription, find_subscription

Retrieves subscription by name.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

topic = pubsub.topic "my-topic"

sub = topic.subscription "my-topic-sub"
sub.name #=> "projects/my-project/subscriptions/my-topic-sub"

Skip the lookup against the service with skip_lookup:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

topic = pubsub.topic "my-topic"

# No API call is made to retrieve the subscription information.
sub = topic.subscription "my-topic-sub", skip_lookup: true
sub.name #=> "projects/my-project/subscriptions/my-topic-sub"

Parameters:

  • subscription_name (String)

    Name of a subscription.

  • skip_lookup (Boolean)

    Optionally create a Subscription object without verifying the subscription resource exists on the Pub/Sub service. Calls made on this object will raise errors if the service resource does not exist. Default is false.

Returns:



200
201
202
203
204
205
206
207
# File 'lib/google/cloud/pubsub/topic.rb', line 200

def subscription subscription_name, skip_lookup: nil
  ensure_service!
  return Subscription.new_lazy subscription_name, service if skip_lookup
  grpc = service.get_subscription subscription_name
  Subscription.from_grpc grpc, service
rescue Google::Cloud::NotFoundError
  nil
end

#subscriptions(token: nil, max: nil) ⇒ Array<Subscription> Also known as: find_subscriptions, list_subscriptions

Retrieves a list of subscription names for the given project.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

topic = pubsub.topic "my-topic"
subscriptions = topic.subscriptions
subscriptions.each do |subscription|
  puts subscription.name
end

Retrieve all subscriptions: (See Subscription::List#all)

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

topic = pubsub.topic "my-topic"
subscriptions = topic.subscriptions
subscriptions.all do |subscription|
  puts subscription.name
end

Parameters:

  • token (String)

    The token value returned by the last call to subscriptions; indicates that this is a continuation of a call, and that the system should return the next page of data.

  • max (Integer)

    Maximum number of subscriptions to return.

Returns:



243
244
245
246
247
248
# File 'lib/google/cloud/pubsub/topic.rb', line 243

def subscriptions token: nil, max: nil
  ensure_service!
  options = { token: token, max: max }
  grpc = service.list_topics_subscriptions name, options
  Subscription::List.from_topic_grpc grpc, service, name, max
end

#test_permissions(*permissions) ⇒ Array<Strings>

Tests the specified permissions against the Cloud IAM access control policy.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new
topic = pubsub.topic "my-topic"
perms = topic.test_permissions "pubsub.topics.get",
                               "pubsub.topics.publish"
perms.include? "pubsub.topics.get" #=> true
perms.include? "pubsub.topics.publish" #=> false

Parameters:

  • permissions (String, Array<String>)

    The set of permissions to check access for. Permissions with wildcards (such as * or storage.*) are not allowed.

    The permissions that can be checked on a topic are:

    • pubsub.topics.publish
    • pubsub.topics.attachSubscription
    • pubsub.topics.get
    • pubsub.topics.delete
    • pubsub.topics.update
    • pubsub.topics.getIamPolicy
    • pubsub.topics.setIamPolicy

Returns:

  • (Array<Strings>)

    The permissions that have access.

See Also:



487
488
489
490
491
492
493
# File 'lib/google/cloud/pubsub/topic.rb', line 487

def test_permissions *permissions
  permissions = Array(permissions).flatten
  permissions = Array(permissions).flatten
  ensure_service!
  grpc = service.test_topic_permissions name, permissions
  grpc.permissions
end

#update_policy(new_policy) ⇒ Policy Also known as: policy=

Updates the Cloud IAM access control policy for this topic. The policy should be read from #policy. See Policy for an explanation of the policy etag property and how to modify policies.

You can also update the policy by passing a block to #policy, which will call this method internally after the block completes.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new
topic = pubsub.topic "my-topic"

policy = topic.policy # API call

policy.add "roles/owner", "user:owner@example.com"

topic.update_policy policy # API call

Parameters:

  • new_policy (Policy)

    a new or modified Cloud IAM Policy for this topic

Returns:

  • (Policy)

    the policy returned by the API update operation

See Also:



447
448
449
450
451
# File 'lib/google/cloud/pubsub/topic.rb', line 447

def update_policy new_policy
  ensure_service!
  grpc = service.set_topic_policy name, new_policy.to_grpc
  @policy = Policy.from_grpc grpc
end