Class: Google::Cloud::Pubsub::Subscription

Inherits:
Object
  • Object
show all
Defined in:
lib/google/cloud/pubsub/subscription.rb,
lib/google/cloud/pubsub/subscription/list.rb

Overview

Subscription

A named resource representing the stream of messages from a single, specific Topic, to be delivered to the subscribing application.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

sub = pubsub.subscription "my-topic-sub"
msgs = sub.pull
msgs.each { |msg| msg.acknowledge! }

Defined Under Namespace

Classes: List

Instance Method Summary collapse

Instance Method Details

#acknowledge(*messages) ⇒ Object Also known as: ack

Acknowledges receipt of a message. After an ack, the Pub/Sub system can remove the message from the subscription. Acknowledging a message whose ack deadline has expired may succeed, although the message may have been sent again. Acknowledging a message more than once will not result in an error. This is only used for messages received via pull.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

sub = pubsub.subscription "my-topic-sub"
messages = sub.pull
sub.acknowledge messages

Parameters:



397
398
399
400
401
402
403
# File 'lib/google/cloud/pubsub/subscription.rb', line 397

def acknowledge *messages
  ack_ids = coerce_ack_ids messages
  return true if ack_ids.empty?
  ensure_service!
  service.acknowledge name, *ack_ids
  true
end

#create_snapshot(snapshot_name = nil) ⇒ Google::Cloud::Pubsub::Snapshot Also known as: new_snapshot

Creates a new Google::Cloud::Pubsub::Snapshot from the subscription. The created snapshot is guaranteed to retain:

  • The existing backlog on the subscription. More precisely, this is defined as the messages in the subscription's backlog that are unacknowledged upon the successful completion of the create_snapshot operation; as well as:
  • Any messages published to the subscription's topic following the successful completion of the create_snapshot operation.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new
sub = pubsub.subscription "my-sub"

snapshot = sub.create_snapshot "my-snapshot"
snapshot.name #=> "projects/my-project/snapshots/my-snapshot"

Without providing a name:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new
sub = pubsub.subscription "my-sub"

snapshot = sub.create_snapshot
snapshot.name #=> "projects/my-project/snapshots/gcr-analysis-..."

Parameters:

  • snapshot_name (String, nil) (defaults to: nil)

    Name of the new snapshot. If the name is not provided, the server will assign a random name for this snapshot on the same project as the subscription. The format is projects/{project}/snapshots/{snap}. The name must start with a letter, and contain only letters ([A-Za-z]), numbers ([0-9], dashes (-), underscores (_), periods (.), tildes (~), plus (+) or percent signs (%). It must be between 3 and 255 characters in length, and it must not start with "goog". Optional.

Returns:



478
479
480
481
482
# File 'lib/google/cloud/pubsub/subscription.rb', line 478

def create_snapshot snapshot_name = nil
  ensure_service!
  grpc = service.create_snapshot name, snapshot_name
  Snapshot.from_grpc grpc, service
end

#deadlineObject

This value is the maximum number of seconds after a subscriber receives a message before the subscriber should acknowledge the message.



87
88
89
90
# File 'lib/google/cloud/pubsub/subscription.rb', line 87

def deadline
  ensure_grpc!
  @grpc.ack_deadline_seconds
end

#deadline=(new_deadline) ⇒ Object



92
93
94
95
96
97
98
99
# File 'lib/google/cloud/pubsub/subscription.rb', line 92

def deadline= new_deadline
  update_grpc = @grpc.dup
  update_grpc.ack_deadline_seconds = new_deadline
  @grpc = service.update_subscription update_grpc,
                                      :ack_deadline_seconds
  @lazy = nil
  self
end

#delay(new_deadline, *messages) ⇒ Object Also known as: modify_ack_deadline

Modifies the acknowledge deadline for messages.

This indicates that more time is needed to process the messages, or to make the messages available for redelivery if the processing was interrupted.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

sub = pubsub.subscription "my-topic-sub"
messages = sub.pull
sub.delay 120, messages

Parameters:

  • new_deadline (Integer)

    The new ack deadline in seconds from the time this request is sent to the Pub/Sub system. Must be >= 0. For example, if the value is 10, the new ack deadline will expire 10 seconds after the call is made. Specifying 0 may immediately make the message available for another pull request.

  • messages (ReceivedMessage, String)

    One or more ReceivedMessage objects or ack_id values.



430
431
432
433
434
435
# File 'lib/google/cloud/pubsub/subscription.rb', line 430

def delay new_deadline, *messages
  ack_ids = coerce_ack_ids messages
  ensure_service!
  service.modify_ack_deadline name, ack_ids, new_deadline
  true
end

#deleteBoolean

Deletes an existing subscription. All pending messages in the subscription are immediately dropped.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

sub = pubsub.subscription "my-topic-sub"
sub.delete

Returns:

  • (Boolean)

    Returns true if the subscription was deleted.



222
223
224
225
226
# File 'lib/google/cloud/pubsub/subscription.rb', line 222

def delete
  ensure_service!
  service.delete_subscription name
  true
end

#endpointObject

Returns the URL locating the endpoint to which messages should be pushed.



153
154
155
156
# File 'lib/google/cloud/pubsub/subscription.rb', line 153

def endpoint
  ensure_grpc!
  @grpc.push_config.push_endpoint if @grpc.push_config
end

#endpoint=(new_endpoint) ⇒ Object

Sets the URL locating the endpoint to which messages should be pushed.



160
161
162
163
164
165
166
167
# File 'lib/google/cloud/pubsub/subscription.rb', line 160

def endpoint= new_endpoint
  ensure_service!
  service.modify_push_config name, new_endpoint, {}
  @grpc.push_config = Google::Pubsub::V1::PushConfig.new(
    push_endpoint: new_endpoint,
    attributes: {}
  ) if @grpc
end

#exists?Boolean

Determines whether the subscription exists in the Pub/Sub service.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

sub = pubsub.subscription "my-topic-sub"
sub.exists? #=> true

Returns:

  • (Boolean)


180
181
182
183
184
185
186
187
188
189
# File 'lib/google/cloud/pubsub/subscription.rb', line 180

def exists?
  # Always true if the object is not set as lazy
  return true unless lazy?
  # If we have a value, return it
  return @exists unless @exists.nil?
  ensure_grpc!
  @exists = true
rescue Google::Cloud::NotFoundError
  @exists = false
end

#listen(deadline: nil, streams: nil, inventory: nil, threads: {}) {|msg| ... } ⇒ Subscriber

Create a Google::Cloud::Pubsub::Subscriber object that receives and processes messages using the code provided in the callback.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

sub = pubsub.subscription "my-topic-sub"

subscriber = sub.listen do |msg|
  # process msg
  msg.ack!
end

subscriber.start

# Shut down the subscriber when ready to stop receiving messages.
subscriber.stop.wait!

Configuring to increase concurrent callbacks:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

sub = pubsub.subscription "my-topic-sub"

subscriber = sub.listen threads: { callback: 16 } do |msg|
  # store the message somewhere before acknowledging
  store_in_backend msg.data # takes a few seconds
  msg.ack!
end

subscriber.start

Parameters:

  • deadline (Numeric)

    The default number of seconds the stream will hold received messages before modifying the message's ack deadline. The minimum is 10, the maximum is 600. Default is #deadline. Optional.

  • streams (Integer)

    The number of concurrent streams to open to pull messages from the subscription. Default is 4. Optional.

  • inventory (Integer)

    The number of received messages to be collected by subscriber. Default is 1,000. Optional.

  • threads (Hash)

    The number of threads to create to handle concurrent calls by each stream opened by the subscriber. Optional.

    Hash keys and values may include the following:

Yields:

  • (msg)

    a block for processing new messages

Yield Parameters:

Returns:



367
368
369
370
371
372
373
374
375
# File 'lib/google/cloud/pubsub/subscription.rb', line 367

def listen deadline: nil, streams: nil, inventory: nil, threads: {},
           &block
  ensure_service!
  deadline ||= self.deadline

  Subscriber.new name, block, deadline: deadline, streams: streams,
                              inventory: inventory, threads: threads,
                              service: service
end

#nameObject

The name of the subscription.



61
62
63
# File 'lib/google/cloud/pubsub/subscription.rb', line 61

def name
  @grpc.name
end

#policy {|policy| ... } ⇒ Policy

Gets the Cloud IAM access control policy for this subscription.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new
sub = pubsub.subscription "my-subscription"

policy = sub.policy

Update the policy by passing a block:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new
sub = pubsub.subscription "my-subscription"

sub.policy do |p|
  p.add "roles/owner", "user:owner@example.com"
end

Yields:

  • (policy)

    A block for updating the policy. The latest policy will be read from the Pub/Sub service and passed to the block. After the block completes, the modified policy will be written to the service.

Yield Parameters:

  • policy (Policy)

    the current Cloud IAM Policy for this subscription

Returns:

  • (Policy)

    the current Cloud IAM Policy for this subscription

See Also:



571
572
573
574
575
576
577
578
# File 'lib/google/cloud/pubsub/subscription.rb', line 571

def policy
  ensure_service!
  grpc = service.get_subscription_policy name
  policy = Policy.from_grpc grpc
  return policy unless block_given?
  yield policy
  self.policy = policy
end

#policy=(new_policy) ⇒ Policy

Updates the Cloud IAM access control policy for this subscription. The policy should be read from #policy. See Policy for an explanation of the policy etag property and how to modify policies.

You can also update the policy by passing a block to #policy, which will call this method internally after the block completes.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new
sub = pubsub.subscription "my-subscription"

policy = sub.policy # API call

policy.add "roles/owner", "user:owner@example.com"

sub.policy = policy # API call

Parameters:

  • new_policy (Policy)

    a new or modified Cloud IAM Policy for this subscription

Returns:

  • (Policy)

    the policy returned by the API update operation

See Also:



609
610
611
612
613
# File 'lib/google/cloud/pubsub/subscription.rb', line 609

def policy= new_policy
  ensure_service!
  grpc = service.set_subscription_policy name, new_policy.to_grpc
  Policy.from_grpc grpc
end

#pull(immediate: true, max: 100) ⇒ Array<Google::Cloud::Pubsub::ReceivedMessage>

Pulls messages from the server. Returns an empty list if there are no messages available in the backlog. Raises an ApiError with status UNAVAILABLE if there are too many concurrent pull requests pending for the given subscription.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

sub = pubsub.subscription "my-topic-sub"
sub.pull.each { |msg| msg.acknowledge! }

A maximum number of messages returned can also be specified:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

sub = pubsub.subscription "my-topic-sub"
sub.pull(max: 10).each { |msg| msg.acknowledge! }

The call can block until messages are available:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

sub = pubsub.subscription "my-topic-sub"
msgs = sub.pull immediate: false
msgs.each { |msg| msg.acknowledge! }

Parameters:

  • immediate (Boolean)

    When true the system will respond immediately even if it is not able to return messages. When false the system is allowed to wait until it can return least one message. No messages are returned when a request times out. The default value is true.

  • max (Integer)

    The maximum number of messages to return for this request. The Pub/Sub system may return fewer than the number specified. The default value is 100, the maximum value is 1000.

Returns:



270
271
272
273
274
275
276
277
278
279
# File 'lib/google/cloud/pubsub/subscription.rb', line 270

def pull immediate: true, max: 100
  ensure_service!
  options = { immediate: immediate, max: max }
  list_grpc = service.pull name, options
  Array(list_grpc.received_messages).map do |msg_grpc|
    ReceivedMessage.from_grpc msg_grpc, self
  end
rescue Google::Cloud::DeadlineExceededError
  []
end

#retain_ackedBoolean

Indicates whether to retain acknowledged messages. If true, then messages are not expunged from the subscription's backlog, even if they are acknowledged, until they fall out of the #retention_duration window. Default is false.

Returns:

  • (Boolean)

    Returns true if acknowledged messages are retained.



110
111
112
113
# File 'lib/google/cloud/pubsub/subscription.rb', line 110

def retain_acked
  ensure_grpc!
  @grpc.retain_acked_messages
end

#retain_acked=(new_retain_acked) ⇒ Object



115
116
117
118
119
120
121
122
# File 'lib/google/cloud/pubsub/subscription.rb', line 115

def retain_acked= new_retain_acked
  update_grpc = @grpc.dup
  update_grpc.retain_acked_messages = !(!new_retain_acked)
  @grpc = service.update_subscription update_grpc,
                                      :retain_acked_messages
  @lazy = nil
  self
end

#retentionNumeric

How long to retain unacknowledged messages in the subscription's backlog, from the moment a message is published. If #retain_acked is true, then this also configures the retention of acknowledged messages, and thus configures how far back in time a #seek can be done. Cannot be more than 604,800 seconds (7 days) or less than 600 seconds (10 minutes). Default is 604,800 seconds (7 days).

Returns:

  • (Numeric)

    The message retention duration in seconds.



135
136
137
138
# File 'lib/google/cloud/pubsub/subscription.rb', line 135

def retention
  ensure_grpc!
  Convert.duration_to_number @grpc.message_retention_duration
end

#retention=(new_retention) ⇒ Object



140
141
142
143
144
145
146
147
148
# File 'lib/google/cloud/pubsub/subscription.rb', line 140

def retention= new_retention
  update_grpc = @grpc.dup
  update_grpc.message_retention_duration = \
    Convert.number_to_duration new_retention
  @grpc = service.update_subscription update_grpc,
                                      :message_retention_duration
  @lazy = nil
  self
end

#seek(snapshot) ⇒ Boolean

Resets the subscription's backlog to a given Google::Cloud::Pubsub::Snapshot or to a point in time, whichever is provided in the request.

Examples:

Using a snapshot

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new
sub = pubsub.subscription "my-sub"

snapshot = sub.create_snapshot

messages = sub.pull
sub.acknowledge messages

sub.seek snapshot

Using a time:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new
sub = pubsub.subscription "my-sub"

time = Time.now

messages = sub.pull
sub.acknowledge messages

sub.seek time

Parameters:

  • snapshot (Snapshot, String, Time)

    The Snapshot instance, snapshot name, or time to which to perform the seek. If the argument is a snapshot, the snapshot's topic must be the same as that of the subscription. If it is a time, messages retained in the subscription that were published before this time are marked as acknowledged, and messages retained in the subscription that were published after this time are marked as unacknowledged. Note that this operation affects only those messages retained in the subscription. For example, if the time corresponds to a point before the message retention window (or to a point before the system's notion of the subscription creation time), only retained messages will be marked as unacknowledged, and already-expunged messages will not be restored.

Returns:

  • (Boolean)

    Returns true if the seek was successful.



531
532
533
534
535
# File 'lib/google/cloud/pubsub/subscription.rb', line 531

def seek snapshot
  ensure_service!
  service.seek name, snapshot
  true
end

#test_permissions(*permissions) ⇒ Array<String>

Tests the specified permissions against the Cloud IAM access control policy.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new
sub = pubsub.subscription "my-subscription"
perms = sub.test_permissions "pubsub.subscriptions.get",
                             "pubsub.subscriptions.consume"
perms.include? "pubsub.subscriptions.get" #=> true
perms.include? "pubsub.subscriptions.consume" #=> false

Parameters:

  • permissions (String, Array<String>)

    The set of permissions to check access for. Permissions with wildcards (such as * or storage.*) are not allowed.

    The permissions that can be checked on a subscription are:

    • pubsub.subscriptions.consume
    • pubsub.subscriptions.get
    • pubsub.subscriptions.delete
    • pubsub.subscriptions.update
    • pubsub.subscriptions.getIamPolicy
    • pubsub.subscriptions.setIamPolicy

Returns:

  • (Array<String>)

    The permissions that have access.

See Also:



647
648
649
650
651
652
# File 'lib/google/cloud/pubsub/subscription.rb', line 647

def test_permissions *permissions
  permissions = Array(permissions).flatten
  ensure_service!
  grpc = service.test_subscription_permissions name, permissions
  grpc.permissions
end

#topicTopic

The Topic from which this subscription receives messages.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

sub = pubsub.subscription "my-topic-sub"
sub.topic.name #=> "projects/my-project/topics/my-topic"

Returns:



78
79
80
81
# File 'lib/google/cloud/pubsub/subscription.rb', line 78

def topic
  ensure_grpc!
  Topic.new_lazy @grpc.topic, service
end

#wait_for_messages(max: 100) ⇒ Array<Google::Cloud::Pubsub::ReceivedMessage>

Pulls from the server while waiting for messages to become available. This is the same as:

subscription.pull immediate: false

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

sub = pubsub.subscription "my-topic-sub"
msgs = sub.wait_for_messages
msgs.each { |msg| msg.acknowledge! }

Parameters:

  • max (Integer)

    The maximum number of messages to return for this request. The Pub/Sub system may return fewer than the number specified. The default value is 100, the maximum value is 1000.

Returns:



302
303
304
# File 'lib/google/cloud/pubsub/subscription.rb', line 302

def wait_for_messages max: 100
  pull immediate: false, max: max
end