Class: Google::Cloud::Pubsub::Subscription

Inherits:
Object
  • Object
show all
Defined in:
lib/google/cloud/pubsub/subscription.rb,
lib/google/cloud/pubsub/subscription/list.rb

Overview

Subscription

A named resource representing the stream of messages from a single, specific Topic, to be delivered to the subscribing application.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

sub = pubsub.subscription "my-topic-sub"
msgs = sub.pull
msgs.each { |msg| msg.acknowledge! }

Defined Under Namespace

Classes: List

Instance Method Summary collapse

Instance Method Details

#acknowledge(*messages) ⇒ Object Also known as: ack

Acknowledges receipt of a message. After an ack, the Pub/Sub system can remove the message from the subscription. Acknowledging a message whose ack deadline has expired may succeed, although the message may have been sent again. Acknowledging a message more than once will not result in an error. This is only used for messages received via pull.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

sub = pubsub.subscription "my-topic-sub"
messages = sub.pull
sub.acknowledge messages

Parameters:



370
371
372
373
374
375
376
# File 'lib/google/cloud/pubsub/subscription.rb', line 370

def acknowledge *messages
  ack_ids = coerce_ack_ids messages
  return true if ack_ids.empty?
  ensure_service!
  service.acknowledge name, *ack_ids
  true
end

#create_snapshot(snapshot_name = nil) ⇒ Google::Cloud::Pubsub::Snapshot Also known as: new_snapshot

Creates a new Google::Cloud::Pubsub::Snapshot from the subscription. The created snapshot is guaranteed to retain:

  • The existing backlog on the subscription. More precisely, this is defined as the messages in the subscription's backlog that are unacknowledged upon the successful completion of the create_snapshot operation; as well as:
  • Any messages published to the subscription's topic following the successful completion of the create_snapshot operation.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new
sub = pubsub.subscription "my-sub"

snapshot = sub.create_snapshot "my-snapshot"
snapshot.name #=> "projects/my-project/snapshots/my-snapshot"

Without providing a name:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new
sub = pubsub.subscription "my-sub"

snapshot = sub.create_snapshot
snapshot.name #=> "projects/my-project/snapshots/gcr-analysis-..."

Parameters:

  • snapshot_name (String, nil) (defaults to: nil)

    Name of the new snapshot. If the name is not provided, the server will assign a random name for this snapshot on the same project as the subscription. The format is projects/{project}/snapshots/{snap}. The name must start with a letter, and contain only letters ([A-Za-z]), numbers ([0-9], dashes (-), underscores (_), periods (.), tildes (~), plus (+) or percent signs (%). It must be between 3 and 255 characters in length, and it must not start with "goog". Optional.

Returns:



450
451
452
453
454
# File 'lib/google/cloud/pubsub/subscription.rb', line 450

def create_snapshot snapshot_name = nil
  ensure_service!
  grpc = service.create_snapshot name, snapshot_name
  Snapshot.from_grpc grpc, service
end

#deadlineObject

This value is the maximum number of seconds after a subscriber receives a message before the subscriber should acknowledge the message.



96
97
98
99
# File 'lib/google/cloud/pubsub/subscription.rb', line 96

def deadline
  ensure_grpc!
  @grpc.ack_deadline_seconds
end

#delay(new_deadline, *messages) ⇒ Object

Modifies the acknowledge deadline for messages.

This indicates that more time is needed to process the messages, or to make the messages available for redelivery if the processing was interrupted.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

sub = pubsub.subscription "my-topic-sub"
messages = sub.pull
sub.delay 120, messages

Parameters:

  • new_deadline (Integer)

    The new ack deadline in seconds from the time this request is sent to the Pub/Sub system. Must be >= 0. For example, if the value is 10, the new ack deadline will expire 10 seconds after the call is made. Specifying 0 may immediately make the message available for another pull request.

  • messages (ReceivedMessage, String)

    One or more ReceivedMessage objects or ack_id values.



403
404
405
406
407
408
# File 'lib/google/cloud/pubsub/subscription.rb', line 403

def delay new_deadline, *messages
  ack_ids = coerce_ack_ids messages
  ensure_service!
  service.modify_ack_deadline name, ack_ids, new_deadline
  true
end

#deleteBoolean

Deletes an existing subscription. All pending messages in the subscription are immediately dropped.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

sub = pubsub.subscription "my-topic-sub"
sub.delete

Returns:

  • (Boolean)

    Returns true if the subscription was deleted.



203
204
205
206
207
# File 'lib/google/cloud/pubsub/subscription.rb', line 203

def delete
  ensure_service!
  service.delete_subscription name
  true
end

#endpointObject

Returns the URL locating the endpoint to which messages should be pushed.



134
135
136
137
# File 'lib/google/cloud/pubsub/subscription.rb', line 134

def endpoint
  ensure_grpc!
  @grpc.push_config.push_endpoint if @grpc.push_config
end

#endpoint=(new_endpoint) ⇒ Object

Sets the URL locating the endpoint to which messages should be pushed.



141
142
143
144
145
146
147
148
# File 'lib/google/cloud/pubsub/subscription.rb', line 141

def endpoint= new_endpoint
  ensure_service!
  service.modify_push_config name, new_endpoint, {}
  @grpc.push_config = Google::Pubsub::V1::PushConfig.new(
    push_endpoint: new_endpoint,
    attributes: {}
  ) if @grpc
end

#exists?Boolean

Determines whether the subscription exists in the Pub/Sub service.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

sub = pubsub.subscription "my-topic-sub"
sub.exists? #=> true

Returns:

  • (Boolean)


161
162
163
164
165
166
167
168
169
170
# File 'lib/google/cloud/pubsub/subscription.rb', line 161

def exists?
  # Always true if we have a grpc object
  return true unless @grpc.nil?
  # If we have a value, return it
  return @exists unless @exists.nil?
  ensure_grpc!
  @exists = !@grpc.nil?
rescue Google::Cloud::NotFoundError
  @exists = false
end

#listen(max: 100, autoack: false, delay: 1) {|msg| ... } ⇒ Object

Poll the backend for new messages. This runs a loop to ping the API, blocking indefinitely, yielding retrieved messages as they are received.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

sub = pubsub.subscription "my-topic-sub"
sub.listen do |msg|
  # process msg
end

Limit number of messages pulled per API request with max:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

sub = pubsub.subscription "my-topic-sub"
sub.listen max: 20 do |msg|
  # process msg
end

Automatically acknowledge messages with autoack:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

sub = pubsub.subscription "my-topic-sub"
sub.listen autoack: true do |msg|
  # process msg
end

Parameters:

  • max (Integer)

    The maximum number of messages to return for this request. The Pub/Sub system may return fewer than the number specified. The default value is 100, the maximum value is 1000.

  • autoack (Boolean)

    Automatically acknowledge the message as it is pulled. The default value is false.

  • delay (Number)

    The number of seconds to pause between requests when the Google Cloud service has no messages to return. The default value is 1.

Yields:

  • (msg)

    a block for processing new messages

Yield Parameters:



339
340
341
342
343
344
345
346
347
348
# File 'lib/google/cloud/pubsub/subscription.rb', line 339

def listen max: 100, autoack: false, delay: 1
  loop do
    msgs = wait_for_messages max: max, autoack: autoack
    if msgs.any?
      msgs.each { |msg| yield msg }
    else
      sleep delay
    end
  end
end

#nameObject

The name of the subscription.



70
71
72
# File 'lib/google/cloud/pubsub/subscription.rb', line 70

def name
  @grpc ? @grpc.name : @name
end

#policy {|policy| ... } ⇒ Policy

Gets the Cloud IAM access control policy for this subscription.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new
sub = pubsub.subscription "my-subscription"

policy = sub.policy

Update the policy by passing a block:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new
sub = pubsub.subscription "my-subscription"

sub.policy do |p|
  p.add "roles/owner", "user:owner@example.com"
end

Yields:

  • (policy)

    A block for updating the policy. The latest policy will be read from the Pub/Sub service and passed to the block. After the block completes, the modified policy will be written to the service.

Yield Parameters:

  • policy (Policy)

    the current Cloud IAM Policy for this subscription

Returns:

  • (Policy)

    the current Cloud IAM Policy for this subscription

See Also:



543
544
545
546
547
548
549
550
# File 'lib/google/cloud/pubsub/subscription.rb', line 543

def policy
  ensure_service!
  grpc = service.get_subscription_policy name
  policy = Policy.from_grpc grpc
  return policy unless block_given?
  yield policy
  self.policy = policy
end

#policy=(new_policy) ⇒ Policy

Updates the Cloud IAM access control policy for this subscription. The policy should be read from #policy. See Policy for an explanation of the policy etag property and how to modify policies.

You can also update the policy by passing a block to #policy, which will call this method internally after the block completes.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new
sub = pubsub.subscription "my-subscription"

policy = sub.policy # API call

policy.add "roles/owner", "user:owner@example.com"

sub.policy = policy # API call

Parameters:

  • new_policy (Policy)

    a new or modified Cloud IAM Policy for this subscription

Returns:

  • (Policy)

    the policy returned by the API update operation

See Also:



581
582
583
584
585
# File 'lib/google/cloud/pubsub/subscription.rb', line 581

def policy= new_policy
  ensure_service!
  grpc = service.set_subscription_policy name, new_policy.to_grpc
  Policy.from_grpc grpc
end

#pull(immediate: true, max: 100, autoack: false) ⇒ Array<Google::Cloud::Pubsub::ReceivedMessage>

Pulls messages from the server. Returns an empty list if there are no messages available in the backlog. Raises an ApiError with status UNAVAILABLE if there are too many concurrent pull requests pending for the given subscription.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

sub = pubsub.subscription "my-topic-sub"
sub.pull.each { |msg| msg.acknowledge! }

A maximum number of messages returned can also be specified:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

sub = pubsub.subscription "my-topic-sub"
sub.pull(max: 10).each { |msg| msg.acknowledge! }

The call can block until messages are available:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

sub = pubsub.subscription "my-topic-sub"
msgs = sub.pull immediate: false
msgs.each { |msg| msg.acknowledge! }

Parameters:

  • immediate (Boolean)

    When true the system will respond immediately even if it is not able to return messages. When false the system is allowed to wait until it can return least one message. No messages are returned when a request times out. The default value is true.

  • max (Integer)

    The maximum number of messages to return for this request. The Pub/Sub system may return fewer than the number specified. The default value is 100, the maximum value is 1000.

  • autoack (Boolean)

    Automatically acknowledge the message as it is pulled. The default value is false.

Returns:



253
254
255
256
257
258
259
260
261
262
263
264
# File 'lib/google/cloud/pubsub/subscription.rb', line 253

def pull immediate: true, max: 100, autoack: false
  ensure_service!
  options = { immediate: immediate, max: max }
  list_grpc = service.pull name, options
  messages = Array(list_grpc.received_messages).map do |msg_grpc|
    ReceivedMessage.from_grpc msg_grpc, self
  end
  acknowledge messages if autoack
  messages
rescue Google::Cloud::DeadlineExceededError
  []
end

#retain_ackedBoolean

Indicates whether to retain acknowledged messages. If true, then messages are not expunged from the subscription's backlog, even if they are acknowledged, until they fall out of the #retention_duration window. Default is false.

Returns:

  • (Boolean)

    Returns true if acknowledged messages are retained.



110
111
112
113
# File 'lib/google/cloud/pubsub/subscription.rb', line 110

def retain_acked
  ensure_grpc!
  @grpc.retain_acked_messages
end

#retentionNumeric

How long to retain unacknowledged messages in the subscription's backlog, from the moment a message is published. If #retain_acked is true, then this also configures the retention of acknowledged messages, and thus configures how far back in time a #seek can be done. Cannot be more than 604,800 seconds (7 days) or less than 600 seconds (10 minutes). Default is 604,800 seconds (7 days).

Returns:

  • (Numeric)

    The message retention duration in seconds.



126
127
128
129
# File 'lib/google/cloud/pubsub/subscription.rb', line 126

def retention
  ensure_grpc!
  duration_to_number @grpc.message_retention_duration
end

#seek(snapshot) ⇒ Boolean

Resets the subscription's backlog to a given Google::Cloud::Pubsub::Snapshot or to a point in time, whichever is provided in the request.

Examples:

Using a snapshot

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new
sub = pubsub.subscription "my-sub"

snapshot = sub.create_snapshot

messages = sub.pull
sub.acknowledge messages

sub.seek snapshot

Using a time:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new
sub = pubsub.subscription "my-sub"

time = Time.now

messages = sub.pull
sub.acknowledge messages

sub.seek time

Parameters:

  • snapshot (Snapshot, String, Time)

    The Snapshot instance, snapshot name, or time to which to perform the seek. If the argument is a snapshot, the snapshot's topic must be the same as that of the subscription. If it is a time, messages retained in the subscription that were published before this time are marked as acknowledged, and messages retained in the subscription that were published after this time are marked as unacknowledged. Note that this operation affects only those messages retained in the subscription. For example, if the time corresponds to a point before the message retention window (or to a point before the system's notion of the subscription creation time), only retained messages will be marked as unacknowledged, and already-expunged messages will not be restored.

Returns:

  • (Boolean)

    Returns true if the seek was successful.



503
504
505
506
507
# File 'lib/google/cloud/pubsub/subscription.rb', line 503

def seek snapshot
  ensure_service!
  service.seek name, snapshot
  true
end

#test_permissions(*permissions) ⇒ Array<String>

Tests the specified permissions against the Cloud IAM access control policy.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new
sub = pubsub.subscription "my-subscription"
perms = sub.test_permissions "pubsub.subscriptions.get",
                             "pubsub.subscriptions.consume"
perms.include? "pubsub.subscriptions.get" #=> true
perms.include? "pubsub.subscriptions.consume" #=> false

Parameters:

  • permissions (String, Array<String>)

    The set of permissions to check access for. Permissions with wildcards (such as * or storage.*) are not allowed.

    The permissions that can be checked on a subscription are:

    • pubsub.subscriptions.consume
    • pubsub.subscriptions.get
    • pubsub.subscriptions.delete
    • pubsub.subscriptions.update
    • pubsub.subscriptions.getIamPolicy
    • pubsub.subscriptions.setIamPolicy

Returns:

  • (Array<String>)

    The permissions that have access.

See Also:



619
620
621
622
623
624
# File 'lib/google/cloud/pubsub/subscription.rb', line 619

def test_permissions *permissions
  permissions = Array(permissions).flatten
  ensure_service!
  grpc = service.test_subscription_permissions name, permissions
  grpc.permissions
end

#topicTopic

The Topic from which this subscription receives messages.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

sub = pubsub.subscription "my-topic-sub"
sub.topic.name #=> "projects/my-project/topics/my-topic"

Returns:



87
88
89
90
# File 'lib/google/cloud/pubsub/subscription.rb', line 87

def topic
  ensure_grpc!
  Topic.new_lazy @grpc.topic, service
end

#wait_for_messages(max: 100, autoack: false) ⇒ Array<Google::Cloud::Pubsub::ReceivedMessage>

Pulls from the server while waiting for messages to become available. This is the same as:

subscription.pull immediate: false

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

sub = pubsub.subscription "my-topic-sub"
msgs = sub.wait_for_messages
msgs.each { |msg| msg.acknowledge! }

Parameters:

  • max (Integer)

    The maximum number of messages to return for this request. The Pub/Sub system may return fewer than the number specified. The default value is 100, the maximum value is 1000.

  • autoack (Boolean)

    Automatically acknowledge the message as it is pulled. The default value is false.

Returns:



289
290
291
# File 'lib/google/cloud/pubsub/subscription.rb', line 289

def wait_for_messages max: 100, autoack: false
  pull immediate: false, max: max, autoack: autoack
end