Class: Google::Cloud::Pubsub::Subscription
- Inherits:
-
Object
- Object
- Google::Cloud::Pubsub::Subscription
- Defined in:
- lib/google/cloud/pubsub/subscription.rb,
lib/google/cloud/pubsub/subscription/list.rb
Overview
Subscription
A named resource representing the stream of messages from a single, specific Topic, to be delivered to the subscribing application.
Defined Under Namespace
Classes: List
Instance Method Summary collapse
-
#acknowledge(*messages) ⇒ Object
(also: #ack)
Acknowledges receipt of a message.
-
#create_snapshot(snapshot_name = nil) ⇒ Google::Cloud::Pubsub::Snapshot
(also: #new_snapshot)
Creates a new Snapshot from the subscription.
-
#deadline ⇒ Object
This value is the maximum number of seconds after a subscriber receives a message before the subscriber should acknowledge the message.
- #deadline=(new_deadline) ⇒ Object
-
#delay(new_deadline, *messages) ⇒ Object
(also: #modify_ack_deadline)
Modifies the acknowledge deadline for messages.
-
#delete ⇒ Boolean
Deletes an existing subscription.
-
#endpoint ⇒ Object
Returns the URL locating the endpoint to which messages should be pushed.
-
#endpoint=(new_endpoint) ⇒ Object
Sets the URL locating the endpoint to which messages should be pushed.
-
#exists? ⇒ Boolean
Determines whether the subscription exists in the Pub/Sub service.
-
#listen(deadline: nil, streams: nil, inventory: nil, threads: {}) {|received_message| ... } ⇒ Subscriber
Create a Subscriber object that receives and processes messages using the code provided in the callback.
-
#name ⇒ Object
The name of the subscription.
-
#policy {|policy| ... } ⇒ Policy
Gets the Cloud IAM access control policy for this subscription.
-
#pull(immediate: true, max: 100) ⇒ Array<Google::Cloud::Pubsub::ReceivedMessage>
Pulls messages from the server.
-
#retain_acked ⇒ Boolean
Indicates whether to retain acknowledged messages.
- #retain_acked=(new_retain_acked) ⇒ Object
-
#retention ⇒ Numeric
How long to retain unacknowledged messages in the subscription's backlog, from the moment a message is published.
- #retention=(new_retention) ⇒ Object
-
#seek(snapshot) ⇒ Boolean
Resets the subscription's backlog to a given Snapshot or to a point in time, whichever is provided in the request.
-
#test_permissions(*permissions) ⇒ Array<String>
Tests the specified permissions against the Cloud IAM access control policy.
-
#topic ⇒ Topic
The Topic from which this subscription receives messages.
-
#update_policy(new_policy) ⇒ Policy
(also: #policy=)
Updates the Cloud IAM access control policy for this subscription.
-
#wait_for_messages(max: 100) ⇒ Array<Google::Cloud::Pubsub::ReceivedMessage>
Pulls from the server while waiting for messages to become available.
Instance Method Details
#acknowledge(*messages) ⇒ Object Also known as: ack
Acknowledges receipt of a message. After an ack, the Pub/Sub system can remove the message from the subscription. Acknowledging a message whose ack deadline has expired may succeed, although the message may have been sent again. Acknowledging a message more than once will not result in an error. This is only used for messages received via pull.
See also ReceivedMessage#acknowledge!.
425 426 427 428 429 430 431 |
# File 'lib/google/cloud/pubsub/subscription.rb', line 425 def acknowledge * ack_ids = coerce_ack_ids return true if ack_ids.empty? ensure_service! service.acknowledge name, *ack_ids true end |
#create_snapshot(snapshot_name = nil) ⇒ Google::Cloud::Pubsub::Snapshot Also known as: new_snapshot
Creates a new Google::Cloud::Pubsub::Snapshot from the subscription. The created snapshot is guaranteed to retain:
- The existing backlog on the subscription. More precisely, this is
defined as the messages in the subscription's backlog that are
unacknowledged upon the successful completion of the
create_snapshot
operation; as well as: - Any messages published to the subscription's topic following the
successful completion of the
create_snapshot
operation.
508 509 510 511 512 |
# File 'lib/google/cloud/pubsub/subscription.rb', line 508 def create_snapshot snapshot_name = nil ensure_service! grpc = service.create_snapshot name, snapshot_name Snapshot.from_grpc grpc, service end |
#deadline ⇒ Object
This value is the maximum number of seconds after a subscriber receives a message before the subscriber should acknowledge the message.
95 96 97 98 |
# File 'lib/google/cloud/pubsub/subscription.rb', line 95 def deadline ensure_grpc! @grpc.ack_deadline_seconds end |
#deadline=(new_deadline) ⇒ Object
100 101 102 103 104 105 106 |
# File 'lib/google/cloud/pubsub/subscription.rb', line 100 def deadline= new_deadline update_grpc = @grpc.dup update_grpc.ack_deadline_seconds = new_deadline @grpc = service.update_subscription update_grpc, :ack_deadline_seconds @lazy = nil end |
#delay(new_deadline, *messages) ⇒ Object Also known as: modify_ack_deadline
Modifies the acknowledge deadline for messages.
This indicates that more time is needed to process the messages, or to make the messages available for redelivery if the processing was interrupted.
See also ReceivedMessage#delay!.
460 461 462 463 464 465 |
# File 'lib/google/cloud/pubsub/subscription.rb', line 460 def delay new_deadline, * ack_ids = coerce_ack_ids ensure_service! service.modify_ack_deadline name, ack_ids, new_deadline true end |
#delete ⇒ Boolean
Deletes an existing subscription. All pending messages in the subscription are immediately dropped.
230 231 232 233 234 |
# File 'lib/google/cloud/pubsub/subscription.rb', line 230 def delete ensure_service! service.delete_subscription name true end |
#endpoint ⇒ Object
Returns the URL locating the endpoint to which messages should be pushed.
158 159 160 161 |
# File 'lib/google/cloud/pubsub/subscription.rb', line 158 def endpoint ensure_grpc! @grpc.push_config.push_endpoint if @grpc.push_config end |
#endpoint=(new_endpoint) ⇒ Object
Sets the URL locating the endpoint to which messages should be pushed.
165 166 167 168 169 170 171 172 173 174 175 |
# File 'lib/google/cloud/pubsub/subscription.rb', line 165 def endpoint= new_endpoint ensure_service! service.modify_push_config name, new_endpoint, {} return unless @grpc @grpc.push_config = Google::Pubsub::V1::PushConfig.new( push_endpoint: new_endpoint, attributes: {} ) end |
#exists? ⇒ Boolean
Determines whether the subscription exists in the Pub/Sub service.
188 189 190 191 192 193 194 195 196 197 |
# File 'lib/google/cloud/pubsub/subscription.rb', line 188 def exists? # Always true if the object is not set as lazy return true unless lazy? # If we have a value, return it return @exists unless @exists.nil? ensure_grpc! @exists = true rescue Google::Cloud::NotFoundError @exists = false end |
#listen(deadline: nil, streams: nil, inventory: nil, threads: {}) {|received_message| ... } ⇒ Subscriber
Create a Google::Cloud::Pubsub::Subscriber object that receives and processes messages using the code provided in the callback.
393 394 395 396 397 398 399 400 401 |
# File 'lib/google/cloud/pubsub/subscription.rb', line 393 def listen deadline: nil, streams: nil, inventory: nil, threads: {}, &block ensure_service! deadline ||= self.deadline Subscriber.new name, block, deadline: deadline, streams: streams, inventory: inventory, threads: threads, service: service end |
#name ⇒ Object
The name of the subscription.
69 70 71 |
# File 'lib/google/cloud/pubsub/subscription.rb', line 69 def name @grpc.name end |
#policy {|policy| ... } ⇒ Policy
Gets the Cloud IAM access control policy for this subscription.
601 602 603 604 605 606 607 608 |
# File 'lib/google/cloud/pubsub/subscription.rb', line 601 def policy ensure_service! grpc = service.get_subscription_policy name policy = Policy.from_grpc grpc return policy unless block_given? yield policy update_policy policy end |
#pull(immediate: true, max: 100) ⇒ Array<Google::Cloud::Pubsub::ReceivedMessage>
Pulls messages from the server. Returns an empty list if there are no
messages available in the backlog. Raises an ApiError with status
UNAVAILABLE
if there are too many concurrent pull requests pending
for the given subscription.
See also #listen for the preferred way to process messages as they become available.
288 289 290 291 292 293 294 295 296 297 |
# File 'lib/google/cloud/pubsub/subscription.rb', line 288 def pull immediate: true, max: 100 ensure_service! = { immediate: immediate, max: max } list_grpc = service.pull name, Array(list_grpc.).map do |msg_grpc| ReceivedMessage.from_grpc msg_grpc, self end rescue Google::Cloud::DeadlineExceededError [] end |
#retain_acked ⇒ Boolean
Indicates whether to retain acknowledged messages. If true
, then
messages are not expunged from the subscription's backlog, even if
they are acknowledged, until they fall out of the
#retention_duration window. Default is false
.
117 118 119 120 |
# File 'lib/google/cloud/pubsub/subscription.rb', line 117 def retain_acked ensure_grpc! @grpc. end |
#retain_acked=(new_retain_acked) ⇒ Object
122 123 124 125 126 127 128 |
# File 'lib/google/cloud/pubsub/subscription.rb', line 122 def retain_acked= new_retain_acked update_grpc = @grpc.dup update_grpc. = !(!new_retain_acked) @grpc = service.update_subscription update_grpc, :retain_acked_messages @lazy = nil end |
#retention ⇒ Numeric
How long to retain unacknowledged messages in the subscription's
backlog, from the moment a message is published. If
#retain_acked is true
, then this also configures the retention of
acknowledged messages, and thus configures how far back in time a
#seek can be done. Cannot be more than 604,800 seconds (7 days) or
less than 600 seconds (10 minutes). Default is 604,800 seconds (7
days).
141 142 143 144 |
# File 'lib/google/cloud/pubsub/subscription.rb', line 141 def retention ensure_grpc! Convert.duration_to_number @grpc. end |
#retention=(new_retention) ⇒ Object
146 147 148 149 150 151 152 153 |
# File 'lib/google/cloud/pubsub/subscription.rb', line 146 def retention= new_retention update_grpc = @grpc.dup update_grpc. = \ Convert.number_to_duration new_retention @grpc = service.update_subscription update_grpc, :message_retention_duration @lazy = nil end |
#seek(snapshot) ⇒ Boolean
Resets the subscription's backlog to a given Google::Cloud::Pubsub::Snapshot or to a point in time, whichever is provided in the request.
561 562 563 564 565 |
# File 'lib/google/cloud/pubsub/subscription.rb', line 561 def seek snapshot ensure_service! service.seek name, snapshot true end |
#test_permissions(*permissions) ⇒ Array<String>
Tests the specified permissions against the Cloud IAM access control policy.
678 679 680 681 682 683 |
# File 'lib/google/cloud/pubsub/subscription.rb', line 678 def * = Array().flatten ensure_service! grpc = service. name, grpc. end |
#topic ⇒ Topic
The Topic from which this subscription receives messages.
86 87 88 89 |
# File 'lib/google/cloud/pubsub/subscription.rb', line 86 def topic ensure_grpc! Topic.new_lazy @grpc.topic, service end |
#update_policy(new_policy) ⇒ Policy Also known as: policy=
Updates the Cloud IAM access control
policy for this subscription. The policy should be read from
#policy. See Policy for an explanation of
the policy etag
property and how to modify policies.
You can also update the policy by passing a block to #policy, which will call this method internally after the block completes.
639 640 641 642 643 |
# File 'lib/google/cloud/pubsub/subscription.rb', line 639 def update_policy new_policy ensure_service! grpc = service.set_subscription_policy name, new_policy.to_grpc Policy.from_grpc grpc end |
#wait_for_messages(max: 100) ⇒ Array<Google::Cloud::Pubsub::ReceivedMessage>
Pulls from the server while waiting for messages to become available. This is the same as:
subscription.pull immediate: false
See also #listen for the preferred way to process messages as they become available.
325 326 327 |
# File 'lib/google/cloud/pubsub/subscription.rb', line 325 def max: 100 pull immediate: false, max: max end |