Class: Google::Cloud::Pubsub::Subscription

Inherits:
Object
  • Object
show all
Defined in:
lib/google/cloud/pubsub/subscription.rb,
lib/google/cloud/pubsub/subscription/list.rb

Overview

Subscription

A named resource representing the stream of messages from a single, specific Topic, to be delivered to the subscribing application.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

sub = pubsub.subscription "my-topic-sub"
subscriber = sub.listen do |received_message|
  # process message
  received_message.acknowledge!
end

# Start background threads that will call the block passed to listen.
subscriber.start

# Shut down the subscriber when ready to stop receiving messages.
subscriber.stop.wait!

Defined Under Namespace

Classes: List

Instance Method Summary collapse

Instance Method Details

#acknowledge(*messages) ⇒ Object Also known as: ack

Acknowledges receipt of a message. After an ack, the Pub/Sub system can remove the message from the subscription. Acknowledging a message whose ack deadline has expired may succeed, although the message may have been sent again. Acknowledging a message more than once will not result in an error. This is only used for messages received via pull.

See also ReceivedMessage#acknowledge!.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

sub = pubsub.subscription "my-topic-sub"
received_messages = sub.pull
sub.acknowledge received_messages

Parameters:



425
426
427
428
429
430
431
# File 'lib/google/cloud/pubsub/subscription.rb', line 425

def acknowledge *messages
  ack_ids = coerce_ack_ids messages
  return true if ack_ids.empty?
  ensure_service!
  service.acknowledge name, *ack_ids
  true
end

#create_snapshot(snapshot_name = nil) ⇒ Google::Cloud::Pubsub::Snapshot Also known as: new_snapshot

Creates a new Google::Cloud::Pubsub::Snapshot from the subscription. The created snapshot is guaranteed to retain:

  • The existing backlog on the subscription. More precisely, this is defined as the messages in the subscription's backlog that are unacknowledged upon the successful completion of the create_snapshot operation; as well as:
  • Any messages published to the subscription's topic following the successful completion of the create_snapshot operation.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new
sub = pubsub.subscription "my-sub"

snapshot = sub.create_snapshot "my-snapshot"
snapshot.name #=> "projects/my-project/snapshots/my-snapshot"

Without providing a name:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new
sub = pubsub.subscription "my-sub"

snapshot = sub.create_snapshot
snapshot.name #=> "projects/my-project/snapshots/gcr-analysis-..."

Parameters:

  • snapshot_name (String, nil) (defaults to: nil)

    Name of the new snapshot. If the name is not provided, the server will assign a random name for this snapshot on the same project as the subscription. The format is projects/{project}/snapshots/{snap}. The name must start with a letter, and contain only letters ([A-Za-z]), numbers ([0-9], dashes (-), underscores (_), periods (.), tildes (~), plus (+) or percent signs (%). It must be between 3 and 255 characters in length, and it must not start with "goog". Optional.

Returns:



508
509
510
511
512
# File 'lib/google/cloud/pubsub/subscription.rb', line 508

def create_snapshot snapshot_name = nil
  ensure_service!
  grpc = service.create_snapshot name, snapshot_name
  Snapshot.from_grpc grpc, service
end

#deadlineObject

This value is the maximum number of seconds after a subscriber receives a message before the subscriber should acknowledge the message.



95
96
97
98
# File 'lib/google/cloud/pubsub/subscription.rb', line 95

def deadline
  ensure_grpc!
  @grpc.ack_deadline_seconds
end

#deadline=(new_deadline) ⇒ Object



100
101
102
103
104
105
106
# File 'lib/google/cloud/pubsub/subscription.rb', line 100

def deadline= new_deadline
  update_grpc = @grpc.dup
  update_grpc.ack_deadline_seconds = new_deadline
  @grpc = service.update_subscription update_grpc,
                                      :ack_deadline_seconds
  @lazy = nil
end

#delay(new_deadline, *messages) ⇒ Object Also known as: modify_ack_deadline

Modifies the acknowledge deadline for messages.

This indicates that more time is needed to process the messages, or to make the messages available for redelivery if the processing was interrupted.

See also ReceivedMessage#delay!.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

sub = pubsub.subscription "my-topic-sub"
received_messages = sub.pull
sub.delay 120, received_messages

Parameters:

  • new_deadline (Integer)

    The new ack deadline in seconds from the time this request is sent to the Pub/Sub system. Must be >= 0. For example, if the value is 10, the new ack deadline will expire 10 seconds after the call is made. Specifying 0 may immediately make the message available for another pull request.

  • messages (ReceivedMessage, String)

    One or more ReceivedMessage objects or ack_id values.



460
461
462
463
464
465
# File 'lib/google/cloud/pubsub/subscription.rb', line 460

def delay new_deadline, *messages
  ack_ids = coerce_ack_ids messages
  ensure_service!
  service.modify_ack_deadline name, ack_ids, new_deadline
  true
end

#deleteBoolean

Deletes an existing subscription. All pending messages in the subscription are immediately dropped.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

sub = pubsub.subscription "my-topic-sub"
sub.delete

Returns:

  • (Boolean)

    Returns true if the subscription was deleted.



230
231
232
233
234
# File 'lib/google/cloud/pubsub/subscription.rb', line 230

def delete
  ensure_service!
  service.delete_subscription name
  true
end

#endpointObject

Returns the URL locating the endpoint to which messages should be pushed.



158
159
160
161
# File 'lib/google/cloud/pubsub/subscription.rb', line 158

def endpoint
  ensure_grpc!
  @grpc.push_config.push_endpoint if @grpc.push_config
end

#endpoint=(new_endpoint) ⇒ Object

Sets the URL locating the endpoint to which messages should be pushed.



165
166
167
168
169
170
171
172
173
174
175
# File 'lib/google/cloud/pubsub/subscription.rb', line 165

def endpoint= new_endpoint
  ensure_service!
  service.modify_push_config name, new_endpoint, {}

  return unless @grpc

  @grpc.push_config = Google::Pubsub::V1::PushConfig.new(
    push_endpoint: new_endpoint,
    attributes: {}
  )
end

#exists?Boolean

Determines whether the subscription exists in the Pub/Sub service.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

sub = pubsub.subscription "my-topic-sub"
sub.exists? #=> true

Returns:

  • (Boolean)


188
189
190
191
192
193
194
195
196
197
# File 'lib/google/cloud/pubsub/subscription.rb', line 188

def exists?
  # Always true if the object is not set as lazy
  return true unless lazy?
  # If we have a value, return it
  return @exists unless @exists.nil?
  ensure_grpc!
  @exists = true
rescue Google::Cloud::NotFoundError
  @exists = false
end

#listen(deadline: nil, streams: nil, inventory: nil, threads: {}) {|received_message| ... } ⇒ Subscriber

Create a Google::Cloud::Pubsub::Subscriber object that receives and processes messages using the code provided in the callback.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

sub = pubsub.subscription "my-topic-sub"

subscriber = sub.listen do |received_message|
  # process message
  received_message.acknowledge!
end

# Start background threads that will call block passed to listen.
subscriber.start

# Shut down the subscriber when ready to stop receiving messages.
subscriber.stop.wait!

Configuring to increase concurrent callbacks:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

sub = pubsub.subscription "my-topic-sub"

subscriber = sub.listen threads: { callback: 16 } do |rec_message|
  # store the message somewhere before acknowledging
  store_in_backend rec_message.data # takes a few seconds
  rec_message.acknowledge!
end

# Start background threads that will call block passed to listen.
subscriber.start

Parameters:

  • deadline (Numeric)

    The default number of seconds the stream will hold received messages before modifying the message's ack deadline. The minimum is 10, the maximum is 600. Default is #deadline. Optional.

  • streams (Integer)

    The number of concurrent streams to open to pull messages from the subscription. Default is 4. Optional.

  • inventory (Integer)

    The number of received messages to be collected by subscriber. Default is 1,000. Optional.

  • threads (Hash)

    The number of threads to create to handle concurrent calls by each stream opened by the subscriber. Optional.

    Hash keys and values may include the following:

Yields:

  • (received_message)

    a block for processing new messages

Yield Parameters:

Returns:



393
394
395
396
397
398
399
400
401
# File 'lib/google/cloud/pubsub/subscription.rb', line 393

def listen deadline: nil, streams: nil, inventory: nil, threads: {},
           &block
  ensure_service!
  deadline ||= self.deadline

  Subscriber.new name, block, deadline: deadline, streams: streams,
                              inventory: inventory, threads: threads,
                              service: service
end

#nameObject

The name of the subscription.



69
70
71
# File 'lib/google/cloud/pubsub/subscription.rb', line 69

def name
  @grpc.name
end

#policy {|policy| ... } ⇒ Policy

Gets the Cloud IAM access control policy for this subscription.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new
sub = pubsub.subscription "my-subscription"

policy = sub.policy

Update the policy by passing a block:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new
sub = pubsub.subscription "my-subscription"

sub.policy do |p|
  p.add "roles/owner", "user:owner@example.com"
end

Yields:

  • (policy)

    A block for updating the policy. The latest policy will be read from the Pub/Sub service and passed to the block. After the block completes, the modified policy will be written to the service.

Yield Parameters:

  • policy (Policy)

    the current Cloud IAM Policy for this subscription

Returns:

  • (Policy)

    the current Cloud IAM Policy for this subscription

See Also:



601
602
603
604
605
606
607
608
# File 'lib/google/cloud/pubsub/subscription.rb', line 601

def policy
  ensure_service!
  grpc = service.get_subscription_policy name
  policy = Policy.from_grpc grpc
  return policy unless block_given?
  yield policy
  update_policy policy
end

#pull(immediate: true, max: 100) ⇒ Array<Google::Cloud::Pubsub::ReceivedMessage>

Pulls messages from the server. Returns an empty list if there are no messages available in the backlog. Raises an ApiError with status UNAVAILABLE if there are too many concurrent pull requests pending for the given subscription.

See also #listen for the preferred way to process messages as they become available.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

sub = pubsub.subscription "my-topic-sub"
sub.pull.each { |received_message| received_message.acknowledge! }

A maximum number of messages returned can also be specified:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

sub = pubsub.subscription "my-topic-sub"
sub.pull(max: 10).each do |received_message|
  received_message.acknowledge!
end

The call can block until messages are available:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

sub = pubsub.subscription "my-topic-sub"
received_messages = sub.pull immediate: false
received_messages.each do |received_message|
  received_message.acknowledge!
end

Parameters:

  • immediate (Boolean)

    When true the system will respond immediately even if it is not able to return messages. When false the system is allowed to wait until it can return least one message. No messages are returned when a request times out. The default value is true.

    See also #listen for the preferred way to process messages as they become available.

  • max (Integer)

    The maximum number of messages to return for this request. The Pub/Sub system may return fewer than the number specified. The default value is 100, the maximum value is 1000.

Returns:



288
289
290
291
292
293
294
295
296
297
# File 'lib/google/cloud/pubsub/subscription.rb', line 288

def pull immediate: true, max: 100
  ensure_service!
  options = { immediate: immediate, max: max }
  list_grpc = service.pull name, options
  Array(list_grpc.received_messages).map do |msg_grpc|
    ReceivedMessage.from_grpc msg_grpc, self
  end
rescue Google::Cloud::DeadlineExceededError
  []
end

#retain_ackedBoolean

Indicates whether to retain acknowledged messages. If true, then messages are not expunged from the subscription's backlog, even if they are acknowledged, until they fall out of the #retention_duration window. Default is false.

Returns:

  • (Boolean)

    Returns true if acknowledged messages are retained.



117
118
119
120
# File 'lib/google/cloud/pubsub/subscription.rb', line 117

def retain_acked
  ensure_grpc!
  @grpc.retain_acked_messages
end

#retain_acked=(new_retain_acked) ⇒ Object



122
123
124
125
126
127
128
# File 'lib/google/cloud/pubsub/subscription.rb', line 122

def retain_acked= new_retain_acked
  update_grpc = @grpc.dup
  update_grpc.retain_acked_messages = !(!new_retain_acked)
  @grpc = service.update_subscription update_grpc,
                                      :retain_acked_messages
  @lazy = nil
end

#retentionNumeric

How long to retain unacknowledged messages in the subscription's backlog, from the moment a message is published. If #retain_acked is true, then this also configures the retention of acknowledged messages, and thus configures how far back in time a #seek can be done. Cannot be more than 604,800 seconds (7 days) or less than 600 seconds (10 minutes). Default is 604,800 seconds (7 days).

Returns:

  • (Numeric)

    The message retention duration in seconds.



141
142
143
144
# File 'lib/google/cloud/pubsub/subscription.rb', line 141

def retention
  ensure_grpc!
  Convert.duration_to_number @grpc.message_retention_duration
end

#retention=(new_retention) ⇒ Object



146
147
148
149
150
151
152
153
# File 'lib/google/cloud/pubsub/subscription.rb', line 146

def retention= new_retention
  update_grpc = @grpc.dup
  update_grpc.message_retention_duration = \
    Convert.number_to_duration new_retention
  @grpc = service.update_subscription update_grpc,
                                      :message_retention_duration
  @lazy = nil
end

#seek(snapshot) ⇒ Boolean

Resets the subscription's backlog to a given Google::Cloud::Pubsub::Snapshot or to a point in time, whichever is provided in the request.

Examples:

Using a snapshot

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new
sub = pubsub.subscription "my-sub"

snapshot = sub.create_snapshot

received_messages = sub.pull
sub.acknowledge received_messages

sub.seek snapshot

Using a time:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new
sub = pubsub.subscription "my-sub"

time = Time.now

received_messages = sub.pull
sub.acknowledge received_messages

sub.seek time

Parameters:

  • snapshot (Snapshot, String, Time)

    The Snapshot instance, snapshot name, or time to which to perform the seek. If the argument is a snapshot, the snapshot's topic must be the same as that of the subscription. If it is a time, messages retained in the subscription that were published before this time are marked as acknowledged, and messages retained in the subscription that were published after this time are marked as unacknowledged. Note that this operation affects only those messages retained in the subscription. For example, if the time corresponds to a point before the message retention window (or to a point before the system's notion of the subscription creation time), only retained messages will be marked as unacknowledged, and already-expunged messages will not be restored.

Returns:

  • (Boolean)

    Returns true if the seek was successful.



561
562
563
564
565
# File 'lib/google/cloud/pubsub/subscription.rb', line 561

def seek snapshot
  ensure_service!
  service.seek name, snapshot
  true
end

#test_permissions(*permissions) ⇒ Array<String>

Tests the specified permissions against the Cloud IAM access control policy.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new
sub = pubsub.subscription "my-subscription"
perms = sub.test_permissions "pubsub.subscriptions.get",
                             "pubsub.subscriptions.consume"
perms.include? "pubsub.subscriptions.get" #=> true
perms.include? "pubsub.subscriptions.consume" #=> false

Parameters:

  • permissions (String, Array<String>)

    The set of permissions to check access for. Permissions with wildcards (such as * or storage.*) are not allowed.

    The permissions that can be checked on a subscription are:

    • pubsub.subscriptions.consume
    • pubsub.subscriptions.get
    • pubsub.subscriptions.delete
    • pubsub.subscriptions.update
    • pubsub.subscriptions.getIamPolicy
    • pubsub.subscriptions.setIamPolicy

Returns:

  • (Array<String>)

    The permissions that have access.

See Also:



678
679
680
681
682
683
# File 'lib/google/cloud/pubsub/subscription.rb', line 678

def test_permissions *permissions
  permissions = Array(permissions).flatten
  ensure_service!
  grpc = service.test_subscription_permissions name, permissions
  grpc.permissions
end

#topicTopic

The Topic from which this subscription receives messages.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

sub = pubsub.subscription "my-topic-sub"
sub.topic.name #=> "projects/my-project/topics/my-topic"

Returns:



86
87
88
89
# File 'lib/google/cloud/pubsub/subscription.rb', line 86

def topic
  ensure_grpc!
  Topic.new_lazy @grpc.topic, service
end

#update_policy(new_policy) ⇒ Policy Also known as: policy=

Updates the Cloud IAM access control policy for this subscription. The policy should be read from #policy. See Policy for an explanation of the policy etag property and how to modify policies.

You can also update the policy by passing a block to #policy, which will call this method internally after the block completes.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new
sub = pubsub.subscription "my-subscription"

policy = sub.policy # API call

policy.add "roles/owner", "user:owner@example.com"

sub.update_policy policy # API call

Parameters:

  • new_policy (Policy)

    a new or modified Cloud IAM Policy for this subscription

Returns:

  • (Policy)

    the policy returned by the API update operation

See Also:



639
640
641
642
643
# File 'lib/google/cloud/pubsub/subscription.rb', line 639

def update_policy new_policy
  ensure_service!
  grpc = service.set_subscription_policy name, new_policy.to_grpc
  Policy.from_grpc grpc
end

#wait_for_messages(max: 100) ⇒ Array<Google::Cloud::Pubsub::ReceivedMessage>

Pulls from the server while waiting for messages to become available. This is the same as:

subscription.pull immediate: false

See also #listen for the preferred way to process messages as they become available.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

sub = pubsub.subscription "my-topic-sub"
received_messages = sub.wait_for_messages
received_messages.each do |received_message|
  received_message.acknowledge!
end

Parameters:

  • max (Integer)

    The maximum number of messages to return for this request. The Pub/Sub system may return fewer than the number specified. The default value is 100, the maximum value is 1000.

Returns:



325
326
327
# File 'lib/google/cloud/pubsub/subscription.rb', line 325

def wait_for_messages max: 100
  pull immediate: false, max: max
end