Class: Google::Cloud::Pubsub::Topic

Inherits:
Object
  • Object
show all
Defined in:
lib/google/cloud/pubsub/topic.rb,
lib/google/cloud/pubsub/topic/list.rb,
lib/google/cloud/pubsub/topic/publisher.rb

Overview

Topic

A named resource to which messages are published.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

topic = pubsub.topic "my-topic"
topic.publish "task completed"

Defined Under Namespace

Classes: List, Publisher

Instance Method Summary collapse

Instance Method Details

#deleteBoolean

Permanently deletes the topic.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

topic = pubsub.topic "my-topic"
topic.delete

Returns:

  • (Boolean)

    Returns true if the topic was deleted.



86
87
88
89
90
# File 'lib/google/cloud/pubsub/topic.rb', line 86

def delete
  ensure_service!
  service.delete_topic name
  true
end

#exists?Boolean

Determines whether the topic exists in the Pub/Sub service.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

topic = pubsub.topic "my-topic"
topic.exists? #=> true

Returns:

  • (Boolean)


423
424
425
426
427
428
429
430
# File 'lib/google/cloud/pubsub/topic.rb', line 423

def exists?
  # Always true if we have a grpc object
  return true unless @grpc.nil?
  # If we have a value, return it
  return @exists unless @exists.nil?
  ensure_grpc!
  @exists = !@grpc.nil?
end

#nameObject

The name of the topic in the form of "/projects/project-identifier/topics/topic-name".



69
70
71
# File 'lib/google/cloud/pubsub/topic.rb', line 69

def name
  @grpc ? @grpc.name : @name
end

#policy {|policy| ... } ⇒ Policy

Gets the Cloud IAM access control policy for this topic.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new
topic = pubsub.topic "my-topic"

policy = topic.policy

Update the policy by passing a block:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new
topic = pubsub.topic "my-topic"

topic.policy do |p|
  p.add "roles/owner", "user:owner@example.com"
end

Yields:

  • (policy)

    A block for updating the policy. The latest policy will be read from the Pub/Sub service and passed to the block. After the block completes, the modified policy will be written to the service.

Yield Parameters:

  • policy (Policy)

    the current Cloud IAM Policy for this topic

Returns:

  • (Policy)

    the current Cloud IAM Policy for this topic

See Also:



327
328
329
330
331
332
333
334
# File 'lib/google/cloud/pubsub/topic.rb', line 327

def policy
  ensure_service!
  grpc = service.get_topic_policy name
  policy = Policy.from_grpc grpc
  return policy unless block_given?
  yield policy
  self.policy = policy
end

#policy=(new_policy) ⇒ Policy

Updates the Cloud IAM access control policy for this topic. The policy should be read from #policy. See Policy for an explanation of the policy etag property and how to modify policies.

You can also update the policy by passing a block to #policy, which will call this method internally after the block completes.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new
topic = pubsub.topic "my-topic"

policy = topic.policy # API call

policy.add "roles/owner", "user:owner@example.com"

topic.policy = policy # API call

Parameters:

  • new_policy (Policy)

    a new or modified Cloud IAM Policy for this topic

Returns:

  • (Policy)

    the policy returned by the API update operation

See Also:



365
366
367
368
369
# File 'lib/google/cloud/pubsub/topic.rb', line 365

def policy= new_policy
  ensure_service!
  grpc = service.set_topic_policy name, new_policy.to_grpc
  @policy = Policy.from_grpc grpc
end

#publish(data = nil, attributes = {}) {|publisher| ... } ⇒ Message+

Publishes one or more messages to the topic.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

topic = pubsub.topic "my-topic"
msg = topic.publish "task completed"

A message can be published using a File object:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

topic = pubsub.topic "my-topic"
msg = topic.publish File.open("message.txt")

Additionally, a message can be published with attributes:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

topic = pubsub.topic "my-topic"
msg = topic.publish "task completed",
                    foo: :bar,
                    this: :that

Multiple messages can be sent at the same time using a block:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

topic = pubsub.topic "my-topic"
msgs = topic.publish do |t|
  t.publish "task 1 completed", foo: :bar
  t.publish "task 2 completed", foo: :baz
  t.publish "task 3 completed", foo: :bif
end

Parameters:

  • data (String, File) (defaults to: nil)

    The message data.

  • attributes (Hash) (defaults to: {})

    Optional attributes for the message.

Yields:

  • (publisher)

    a block for publishing multiple messages in one request

Yield Parameters:

Returns:

  • (Message, Array<Message>)

    Returns the published message when called without a block, or an array of messages when called with a block.



285
286
287
288
289
290
291
# File 'lib/google/cloud/pubsub/topic.rb', line 285

def publish data = nil, attributes = {}
  ensure_service!
  publisher = Publisher.new data, attributes
  yield publisher if block_given?
  return nil if publisher.messages.count.zero?
  publish_batch_messages publisher
end

#subscribe(subscription_name, deadline: nil, retain_acked: false, retention: nil, endpoint: nil) ⇒ Google::Cloud::Pubsub::Subscription Also known as: create_subscription, new_subscription

Creates a new Subscription object on the current Topic.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

topic = pubsub.topic "my-topic"
sub = topic.subscribe "my-topic-sub"
sub.name # => "my-topic-sub"

Wait 2 minutes for acknowledgement and push all to endpoint:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

topic = pubsub.topic "my-topic"
sub = topic.subscribe "my-topic-sub",
                      deadline: 120,
                      endpoint: "https://example.com/push"

Parameters:

  • subscription_name (String)

    Name of the new subscription. Must start with a letter, and contain only letters ([A-Za-z]), numbers ([0-9], dashes (-), underscores (_), periods (.), tildes (~), plus (+) or percent signs (%). It must be between 3 and 255 characters in length, and it must not start with "goog". Required.

  • deadline (Integer)

    The maximum number of seconds after a subscriber receives a message before the subscriber should acknowledge the message.

  • retain_acked (Boolean)

    Indicates whether to retain acknowledged messages. If true, then messages are not expunged from the subscription's backlog, even if they are acknowledged, until they fall out of the retention_duration window. Default is false.

  • retention (Numeric)

    How long to retain unacknowledged messages in the subscription's backlog, from the moment a message is published. If retain_acked is true, then this also configures the retention of acknowledged messages, and thus configures how far back in time a #seek can be done. Cannot be more than 604,800 seconds (7 days) or less than 600 seconds (10 minutes). Default is 604,800 seconds (7 days).

  • endpoint (String)

    A URL locating the endpoint to which messages should be pushed.

Returns:



138
139
140
141
142
143
144
145
# File 'lib/google/cloud/pubsub/topic.rb', line 138

def subscribe subscription_name, deadline: nil, retain_acked: false,
              retention: nil, endpoint: nil
  ensure_service!
  options = { deadline: deadline, retain_acked: retain_acked,
              retention: retention, endpoint: endpoint }
  grpc = service.create_subscription name, subscription_name, options
  Subscription.from_grpc grpc, service
end

#subscription(subscription_name, skip_lookup: nil) ⇒ Google::Cloud::Pubsub::Subscription? Also known as: get_subscription, find_subscription

Retrieves subscription by name.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

topic = pubsub.topic "my-topic"

sub = topic.subscription "my-topic-sub"
sub.name #=> "projects/my-project/subscriptions/my-topic-sub"

Skip the lookup against the service with skip_lookup:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

topic = pubsub.topic "my-topic"

# No API call is made to retrieve the subscription information.
sub = topic.subscription "my-topic-sub", skip_lookup: true
sub.name #=> "projects/my-project/subscriptions/my-topic-sub"

Parameters:

  • subscription_name (String)

    Name of a subscription.

  • skip_lookup (Boolean)

    Optionally create a Subscription object without verifying the subscription resource exists on the Pub/Sub service. Calls made on this object will raise errors if the service resource does not exist. Default is false.

Returns:



182
183
184
185
186
187
188
189
# File 'lib/google/cloud/pubsub/topic.rb', line 182

def subscription subscription_name, skip_lookup: nil
  ensure_service!
  return Subscription.new_lazy subscription_name, service if skip_lookup
  grpc = service.get_subscription subscription_name
  Subscription.from_grpc grpc, service
rescue Google::Cloud::NotFoundError
  nil
end

#subscriptions(token: nil, max: nil) ⇒ Array<Subscription> Also known as: find_subscriptions, list_subscriptions

Retrieves a list of subscription names for the given project.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

topic = pubsub.topic "my-topic"
subscriptions = topic.subscriptions
subscriptions.each do |subscription|
  puts subscription.name
end

Retrieve all subscriptions: (See Subscription::List#all)

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

topic = pubsub.topic "my-topic"
subscriptions = topic.subscriptions
subscriptions.all do |subscription|
  puts subscription.name
end

Parameters:

  • token (String)

    The token value returned by the last call to subscriptions; indicates that this is a continuation of a call, and that the system should return the next page of data.

  • max (Integer)

    Maximum number of subscriptions to return.

Returns:



225
226
227
228
229
230
# File 'lib/google/cloud/pubsub/topic.rb', line 225

def subscriptions token: nil, max: nil
  ensure_service!
  options = { token: token, max: max }
  grpc = service.list_topics_subscriptions name, options
  Subscription::List.from_topic_grpc grpc, service, name, max
end

#test_permissions(*permissions) ⇒ Array<Strings>

Tests the specified permissions against the Cloud IAM access control policy.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new
topic = pubsub.topic "my-topic"
perms = topic.test_permissions "pubsub.topics.get",
                               "pubsub.topics.publish"
perms.include? "pubsub.topics.get" #=> true
perms.include? "pubsub.topics.publish" #=> false

Parameters:

  • permissions (String, Array<String>)

    The set of permissions to check access for. Permissions with wildcards (such as * or storage.*) are not allowed.

    The permissions that can be checked on a topic are:

    • pubsub.topics.publish
    • pubsub.topics.attachSubscription
    • pubsub.topics.get
    • pubsub.topics.delete
    • pubsub.topics.update
    • pubsub.topics.getIamPolicy
    • pubsub.topics.setIamPolicy

Returns:

  • (Array<Strings>)

    The permissions that have access.

See Also:



404
405
406
407
408
409
410
# File 'lib/google/cloud/pubsub/topic.rb', line 404

def test_permissions *permissions
  permissions = Array(permissions).flatten
  permissions = Array(permissions).flatten
  ensure_service!
  grpc = service.test_topic_permissions name, permissions
  grpc.permissions
end