Class: Google::Cloud::Pubsub::Subscription
- Inherits:
-
Object
- Object
- Google::Cloud::Pubsub::Subscription
- Defined in:
- lib/google/cloud/pubsub/subscription.rb,
lib/google/cloud/pubsub/subscription/list.rb
Overview
Subscription
A named resource representing the stream of messages from a single, specific Topic, to be delivered to the subscribing application.
Defined Under Namespace
Classes: List
Instance Method Summary collapse
-
#acknowledge(*messages) ⇒ Object
(also: #ack)
Acknowledges receipt of a message.
-
#create_snapshot(snapshot_name = nil) ⇒ Google::Cloud::Pubsub::Snapshot
(also: #new_snapshot)
Creates a new Snapshot from the subscription.
-
#deadline ⇒ Object
This value is the maximum number of seconds after a subscriber receives a message before the subscriber should acknowledge the message.
-
#delay(new_deadline, *messages) ⇒ Object
Modifies the acknowledge deadline for messages.
-
#delete ⇒ Boolean
Deletes an existing subscription.
-
#endpoint ⇒ Object
Returns the URL locating the endpoint to which messages should be pushed.
-
#endpoint=(new_endpoint) ⇒ Object
Sets the URL locating the endpoint to which messages should be pushed.
-
#exists? ⇒ Boolean
Determines whether the subscription exists in the Pub/Sub service.
-
#listen(max: 100, autoack: false, delay: 1) {|msg| ... } ⇒ Object
Poll the backend for new messages.
-
#name ⇒ Object
The name of the subscription.
-
#policy {|policy| ... } ⇒ Policy
Gets the Cloud IAM access control policy for this subscription.
-
#policy=(new_policy) ⇒ Policy
Updates the Cloud IAM access control policy for this subscription.
-
#pull(immediate: true, max: 100, autoack: false) ⇒ Array<Google::Cloud::Pubsub::ReceivedMessage>
Pulls messages from the server.
-
#retain_acked ⇒ Boolean
Indicates whether to retain acknowledged messages.
-
#retention ⇒ Numeric
How long to retain unacknowledged messages in the subscription's backlog, from the moment a message is published.
-
#seek(snapshot) ⇒ Boolean
Resets the subscription's backlog to a given Snapshot or to a point in time, whichever is provided in the request.
-
#test_permissions(*permissions) ⇒ Array<String>
Tests the specified permissions against the Cloud IAM access control policy.
-
#topic ⇒ Topic
The Topic from which this subscription receives messages.
-
#wait_for_messages(max: 100, autoack: false) ⇒ Array<Google::Cloud::Pubsub::ReceivedMessage>
Pulls from the server while waiting for messages to become available.
Instance Method Details
#acknowledge(*messages) ⇒ Object Also known as: ack
Acknowledges receipt of a message. After an ack, the Pub/Sub system can remove the message from the subscription. Acknowledging a message whose ack deadline has expired may succeed, although the message may have been sent again. Acknowledging a message more than once will not result in an error. This is only used for messages received via pull.
370 371 372 373 374 375 376 |
# File 'lib/google/cloud/pubsub/subscription.rb', line 370 def acknowledge * ack_ids = coerce_ack_ids return true if ack_ids.empty? ensure_service! service.acknowledge name, *ack_ids true end |
#create_snapshot(snapshot_name = nil) ⇒ Google::Cloud::Pubsub::Snapshot Also known as: new_snapshot
Creates a new Google::Cloud::Pubsub::Snapshot from the subscription. The created snapshot is guaranteed to retain:
- The existing backlog on the subscription. More precisely, this is
defined as the messages in the subscription's backlog that are
unacknowledged upon the successful completion of the
create_snapshot
operation; as well as: - Any messages published to the subscription's topic following the
successful completion of the
create_snapshot
operation.
450 451 452 453 454 |
# File 'lib/google/cloud/pubsub/subscription.rb', line 450 def create_snapshot snapshot_name = nil ensure_service! grpc = service.create_snapshot name, snapshot_name Snapshot.from_grpc grpc, service end |
#deadline ⇒ Object
This value is the maximum number of seconds after a subscriber receives a message before the subscriber should acknowledge the message.
96 97 98 99 |
# File 'lib/google/cloud/pubsub/subscription.rb', line 96 def deadline ensure_grpc! @grpc.ack_deadline_seconds end |
#delay(new_deadline, *messages) ⇒ Object
Modifies the acknowledge deadline for messages.
This indicates that more time is needed to process the messages, or to make the messages available for redelivery if the processing was interrupted.
403 404 405 406 407 408 |
# File 'lib/google/cloud/pubsub/subscription.rb', line 403 def delay new_deadline, * ack_ids = coerce_ack_ids ensure_service! service.modify_ack_deadline name, ack_ids, new_deadline true end |
#delete ⇒ Boolean
Deletes an existing subscription. All pending messages in the subscription are immediately dropped.
203 204 205 206 207 |
# File 'lib/google/cloud/pubsub/subscription.rb', line 203 def delete ensure_service! service.delete_subscription name true end |
#endpoint ⇒ Object
Returns the URL locating the endpoint to which messages should be pushed.
134 135 136 137 |
# File 'lib/google/cloud/pubsub/subscription.rb', line 134 def endpoint ensure_grpc! @grpc.push_config.push_endpoint if @grpc.push_config end |
#endpoint=(new_endpoint) ⇒ Object
Sets the URL locating the endpoint to which messages should be pushed.
141 142 143 144 145 146 147 148 |
# File 'lib/google/cloud/pubsub/subscription.rb', line 141 def endpoint= new_endpoint ensure_service! service.modify_push_config name, new_endpoint, {} @grpc.push_config = Google::Pubsub::V1::PushConfig.new( push_endpoint: new_endpoint, attributes: {} ) if @grpc end |
#exists? ⇒ Boolean
Determines whether the subscription exists in the Pub/Sub service.
161 162 163 164 165 166 167 168 169 170 |
# File 'lib/google/cloud/pubsub/subscription.rb', line 161 def exists? # Always true if we have a grpc object return true unless @grpc.nil? # If we have a value, return it return @exists unless @exists.nil? ensure_grpc! @exists = !@grpc.nil? rescue Google::Cloud::NotFoundError @exists = false end |
#listen(max: 100, autoack: false, delay: 1) {|msg| ... } ⇒ Object
Poll the backend for new messages. This runs a loop to ping the API, blocking indefinitely, yielding retrieved messages as they are received.
339 340 341 342 343 344 345 346 347 348 |
# File 'lib/google/cloud/pubsub/subscription.rb', line 339 def listen max: 100, autoack: false, delay: 1 loop do msgs = max: max, autoack: autoack if msgs.any? msgs.each { |msg| yield msg } else sleep delay end end end |
#name ⇒ Object
The name of the subscription.
70 71 72 |
# File 'lib/google/cloud/pubsub/subscription.rb', line 70 def name @grpc ? @grpc.name : @name end |
#policy {|policy| ... } ⇒ Policy
Gets the Cloud IAM access control policy for this subscription.
543 544 545 546 547 548 549 550 |
# File 'lib/google/cloud/pubsub/subscription.rb', line 543 def policy ensure_service! grpc = service.get_subscription_policy name policy = Policy.from_grpc grpc return policy unless block_given? yield policy self.policy = policy end |
#policy=(new_policy) ⇒ Policy
Updates the Cloud IAM access control
policy for this subscription. The policy should be read from
#policy. See Policy for an explanation of
the policy etag
property and how to modify policies.
You can also update the policy by passing a block to #policy, which will call this method internally after the block completes.
581 582 583 584 585 |
# File 'lib/google/cloud/pubsub/subscription.rb', line 581 def policy= new_policy ensure_service! grpc = service.set_subscription_policy name, new_policy.to_grpc Policy.from_grpc grpc end |
#pull(immediate: true, max: 100, autoack: false) ⇒ Array<Google::Cloud::Pubsub::ReceivedMessage>
Pulls messages from the server. Returns an empty list if there are no
messages available in the backlog. Raises an ApiError with status
UNAVAILABLE
if there are too many concurrent pull requests pending
for the given subscription.
253 254 255 256 257 258 259 260 261 262 263 264 |
# File 'lib/google/cloud/pubsub/subscription.rb', line 253 def pull immediate: true, max: 100, autoack: false ensure_service! = { immediate: immediate, max: max } list_grpc = service.pull name, = Array(list_grpc.).map do |msg_grpc| ReceivedMessage.from_grpc msg_grpc, self end acknowledge if autoack rescue Google::Cloud::DeadlineExceededError [] end |
#retain_acked ⇒ Boolean
Indicates whether to retain acknowledged messages. If true
, then
messages are not expunged from the subscription's backlog, even if
they are acknowledged, until they fall out of the
#retention_duration window. Default is false
.
110 111 112 113 |
# File 'lib/google/cloud/pubsub/subscription.rb', line 110 def retain_acked ensure_grpc! @grpc. end |
#retention ⇒ Numeric
How long to retain unacknowledged messages in the subscription's
backlog, from the moment a message is published. If
#retain_acked is true
, then this also configures the retention of
acknowledged messages, and thus configures how far back in time a
#seek can be done. Cannot be more than 604,800 seconds (7 days) or
less than 600 seconds (10 minutes). Default is 604,800 seconds (7
days).
126 127 128 129 |
# File 'lib/google/cloud/pubsub/subscription.rb', line 126 def retention ensure_grpc! duration_to_number @grpc. end |
#seek(snapshot) ⇒ Boolean
Resets the subscription's backlog to a given Google::Cloud::Pubsub::Snapshot or to a point in time, whichever is provided in the request.
503 504 505 506 507 |
# File 'lib/google/cloud/pubsub/subscription.rb', line 503 def seek snapshot ensure_service! service.seek name, snapshot true end |
#test_permissions(*permissions) ⇒ Array<String>
Tests the specified permissions against the Cloud IAM access control policy.
619 620 621 622 623 624 |
# File 'lib/google/cloud/pubsub/subscription.rb', line 619 def * = Array().flatten ensure_service! grpc = service. name, grpc. end |
#topic ⇒ Topic
The Topic from which this subscription receives messages.
87 88 89 90 |
# File 'lib/google/cloud/pubsub/subscription.rb', line 87 def topic ensure_grpc! Topic.new_lazy @grpc.topic, service end |
#wait_for_messages(max: 100, autoack: false) ⇒ Array<Google::Cloud::Pubsub::ReceivedMessage>
Pulls from the server while waiting for messages to become available. This is the same as:
subscription.pull immediate: false
289 290 291 |
# File 'lib/google/cloud/pubsub/subscription.rb', line 289 def max: 100, autoack: false pull immediate: false, max: max, autoack: autoack end |