Class: Google::Cloud::Pubsub::Subscription
- Inherits:
-
Object
- Object
- Google::Cloud::Pubsub::Subscription
- Defined in:
- lib/google/cloud/pubsub/subscription.rb,
lib/google/cloud/pubsub/subscription/list.rb
Overview
Subscription
A named resource representing the stream of messages from a single, specific Topic, to be delivered to the subscribing application.
Defined Under Namespace
Classes: List
Instance Method Summary collapse
-
#acknowledge(*messages) ⇒ Object
(also: #ack)
Acknowledges receipt of a message.
-
#create_snapshot(snapshot_name = nil) ⇒ Google::Cloud::Pubsub::Snapshot
(also: #new_snapshot)
Creates a new Snapshot from the subscription.
-
#deadline ⇒ Object
This value is the maximum number of seconds after a subscriber receives a message before the subscriber should acknowledge the message.
- #deadline=(new_deadline) ⇒ Object
-
#delay(new_deadline, *messages) ⇒ Object
(also: #modify_ack_deadline)
Modifies the acknowledge deadline for messages.
-
#delete ⇒ Boolean
Deletes an existing subscription.
-
#endpoint ⇒ Object
Returns the URL locating the endpoint to which messages should be pushed.
-
#endpoint=(new_endpoint) ⇒ Object
Sets the URL locating the endpoint to which messages should be pushed.
-
#exists? ⇒ Boolean
Determines whether the subscription exists in the Pub/Sub service.
-
#listen(deadline: nil, streams: nil, inventory: nil, threads: {}) {|msg| ... } ⇒ Subscriber
Create a Subscriber object that receives and processes messages using the code provided in the callback.
-
#name ⇒ Object
The name of the subscription.
-
#policy {|policy| ... } ⇒ Policy
Gets the Cloud IAM access control policy for this subscription.
-
#policy=(new_policy) ⇒ Policy
Updates the Cloud IAM access control policy for this subscription.
-
#pull(immediate: true, max: 100) ⇒ Array<Google::Cloud::Pubsub::ReceivedMessage>
Pulls messages from the server.
-
#retain_acked ⇒ Boolean
Indicates whether to retain acknowledged messages.
- #retain_acked=(new_retain_acked) ⇒ Object
-
#retention ⇒ Numeric
How long to retain unacknowledged messages in the subscription's backlog, from the moment a message is published.
- #retention=(new_retention) ⇒ Object
-
#seek(snapshot) ⇒ Boolean
Resets the subscription's backlog to a given Snapshot or to a point in time, whichever is provided in the request.
-
#test_permissions(*permissions) ⇒ Array<String>
Tests the specified permissions against the Cloud IAM access control policy.
-
#topic ⇒ Topic
The Topic from which this subscription receives messages.
-
#wait_for_messages(max: 100) ⇒ Array<Google::Cloud::Pubsub::ReceivedMessage>
Pulls from the server while waiting for messages to become available.
Instance Method Details
#acknowledge(*messages) ⇒ Object Also known as: ack
Acknowledges receipt of a message. After an ack, the Pub/Sub system can remove the message from the subscription. Acknowledging a message whose ack deadline has expired may succeed, although the message may have been sent again. Acknowledging a message more than once will not result in an error. This is only used for messages received via pull.
397 398 399 400 401 402 403 |
# File 'lib/google/cloud/pubsub/subscription.rb', line 397 def acknowledge * ack_ids = coerce_ack_ids return true if ack_ids.empty? ensure_service! service.acknowledge name, *ack_ids true end |
#create_snapshot(snapshot_name = nil) ⇒ Google::Cloud::Pubsub::Snapshot Also known as: new_snapshot
Creates a new Google::Cloud::Pubsub::Snapshot from the subscription. The created snapshot is guaranteed to retain:
- The existing backlog on the subscription. More precisely, this is
defined as the messages in the subscription's backlog that are
unacknowledged upon the successful completion of the
create_snapshot
operation; as well as: - Any messages published to the subscription's topic following the
successful completion of the
create_snapshot
operation.
478 479 480 481 482 |
# File 'lib/google/cloud/pubsub/subscription.rb', line 478 def create_snapshot snapshot_name = nil ensure_service! grpc = service.create_snapshot name, snapshot_name Snapshot.from_grpc grpc, service end |
#deadline ⇒ Object
This value is the maximum number of seconds after a subscriber receives a message before the subscriber should acknowledge the message.
87 88 89 90 |
# File 'lib/google/cloud/pubsub/subscription.rb', line 87 def deadline ensure_grpc! @grpc.ack_deadline_seconds end |
#deadline=(new_deadline) ⇒ Object
92 93 94 95 96 97 98 99 |
# File 'lib/google/cloud/pubsub/subscription.rb', line 92 def deadline= new_deadline update_grpc = @grpc.dup update_grpc.ack_deadline_seconds = new_deadline @grpc = service.update_subscription update_grpc, :ack_deadline_seconds @lazy = nil self end |
#delay(new_deadline, *messages) ⇒ Object Also known as: modify_ack_deadline
Modifies the acknowledge deadline for messages.
This indicates that more time is needed to process the messages, or to make the messages available for redelivery if the processing was interrupted.
430 431 432 433 434 435 |
# File 'lib/google/cloud/pubsub/subscription.rb', line 430 def delay new_deadline, * ack_ids = coerce_ack_ids ensure_service! service.modify_ack_deadline name, ack_ids, new_deadline true end |
#delete ⇒ Boolean
Deletes an existing subscription. All pending messages in the subscription are immediately dropped.
222 223 224 225 226 |
# File 'lib/google/cloud/pubsub/subscription.rb', line 222 def delete ensure_service! service.delete_subscription name true end |
#endpoint ⇒ Object
Returns the URL locating the endpoint to which messages should be pushed.
153 154 155 156 |
# File 'lib/google/cloud/pubsub/subscription.rb', line 153 def endpoint ensure_grpc! @grpc.push_config.push_endpoint if @grpc.push_config end |
#endpoint=(new_endpoint) ⇒ Object
Sets the URL locating the endpoint to which messages should be pushed.
160 161 162 163 164 165 166 167 |
# File 'lib/google/cloud/pubsub/subscription.rb', line 160 def endpoint= new_endpoint ensure_service! service.modify_push_config name, new_endpoint, {} @grpc.push_config = Google::Pubsub::V1::PushConfig.new( push_endpoint: new_endpoint, attributes: {} ) if @grpc end |
#exists? ⇒ Boolean
Determines whether the subscription exists in the Pub/Sub service.
180 181 182 183 184 185 186 187 188 189 |
# File 'lib/google/cloud/pubsub/subscription.rb', line 180 def exists? # Always true if the object is not set as lazy return true unless lazy? # If we have a value, return it return @exists unless @exists.nil? ensure_grpc! @exists = true rescue Google::Cloud::NotFoundError @exists = false end |
#listen(deadline: nil, streams: nil, inventory: nil, threads: {}) {|msg| ... } ⇒ Subscriber
Create a Google::Cloud::Pubsub::Subscriber object that receives and processes messages using the code provided in the callback.
367 368 369 370 371 372 373 374 375 |
# File 'lib/google/cloud/pubsub/subscription.rb', line 367 def listen deadline: nil, streams: nil, inventory: nil, threads: {}, &block ensure_service! deadline ||= self.deadline Subscriber.new name, block, deadline: deadline, streams: streams, inventory: inventory, threads: threads, service: service end |
#name ⇒ Object
The name of the subscription.
61 62 63 |
# File 'lib/google/cloud/pubsub/subscription.rb', line 61 def name @grpc.name end |
#policy {|policy| ... } ⇒ Policy
Gets the Cloud IAM access control policy for this subscription.
571 572 573 574 575 576 577 578 |
# File 'lib/google/cloud/pubsub/subscription.rb', line 571 def policy ensure_service! grpc = service.get_subscription_policy name policy = Policy.from_grpc grpc return policy unless block_given? yield policy self.policy = policy end |
#policy=(new_policy) ⇒ Policy
Updates the Cloud IAM access control
policy for this subscription. The policy should be read from
#policy. See Policy for an explanation of
the policy etag
property and how to modify policies.
You can also update the policy by passing a block to #policy, which will call this method internally after the block completes.
609 610 611 612 613 |
# File 'lib/google/cloud/pubsub/subscription.rb', line 609 def policy= new_policy ensure_service! grpc = service.set_subscription_policy name, new_policy.to_grpc Policy.from_grpc grpc end |
#pull(immediate: true, max: 100) ⇒ Array<Google::Cloud::Pubsub::ReceivedMessage>
Pulls messages from the server. Returns an empty list if there are no
messages available in the backlog. Raises an ApiError with status
UNAVAILABLE
if there are too many concurrent pull requests pending
for the given subscription.
270 271 272 273 274 275 276 277 278 279 |
# File 'lib/google/cloud/pubsub/subscription.rb', line 270 def pull immediate: true, max: 100 ensure_service! = { immediate: immediate, max: max } list_grpc = service.pull name, Array(list_grpc.).map do |msg_grpc| ReceivedMessage.from_grpc msg_grpc, self end rescue Google::Cloud::DeadlineExceededError [] end |
#retain_acked ⇒ Boolean
Indicates whether to retain acknowledged messages. If true
, then
messages are not expunged from the subscription's backlog, even if
they are acknowledged, until they fall out of the
#retention_duration window. Default is false
.
110 111 112 113 |
# File 'lib/google/cloud/pubsub/subscription.rb', line 110 def retain_acked ensure_grpc! @grpc. end |
#retain_acked=(new_retain_acked) ⇒ Object
115 116 117 118 119 120 121 122 |
# File 'lib/google/cloud/pubsub/subscription.rb', line 115 def retain_acked= new_retain_acked update_grpc = @grpc.dup update_grpc. = !(!new_retain_acked) @grpc = service.update_subscription update_grpc, :retain_acked_messages @lazy = nil self end |
#retention ⇒ Numeric
How long to retain unacknowledged messages in the subscription's
backlog, from the moment a message is published. If
#retain_acked is true
, then this also configures the retention of
acknowledged messages, and thus configures how far back in time a
#seek can be done. Cannot be more than 604,800 seconds (7 days) or
less than 600 seconds (10 minutes). Default is 604,800 seconds (7
days).
135 136 137 138 |
# File 'lib/google/cloud/pubsub/subscription.rb', line 135 def retention ensure_grpc! Convert.duration_to_number @grpc. end |
#retention=(new_retention) ⇒ Object
140 141 142 143 144 145 146 147 148 |
# File 'lib/google/cloud/pubsub/subscription.rb', line 140 def retention= new_retention update_grpc = @grpc.dup update_grpc. = \ Convert.number_to_duration new_retention @grpc = service.update_subscription update_grpc, :message_retention_duration @lazy = nil self end |
#seek(snapshot) ⇒ Boolean
Resets the subscription's backlog to a given Google::Cloud::Pubsub::Snapshot or to a point in time, whichever is provided in the request.
531 532 533 534 535 |
# File 'lib/google/cloud/pubsub/subscription.rb', line 531 def seek snapshot ensure_service! service.seek name, snapshot true end |
#test_permissions(*permissions) ⇒ Array<String>
Tests the specified permissions against the Cloud IAM access control policy.
647 648 649 650 651 652 |
# File 'lib/google/cloud/pubsub/subscription.rb', line 647 def * = Array().flatten ensure_service! grpc = service. name, grpc. end |
#topic ⇒ Topic
The Topic from which this subscription receives messages.
78 79 80 81 |
# File 'lib/google/cloud/pubsub/subscription.rb', line 78 def topic ensure_grpc! Topic.new_lazy @grpc.topic, service end |
#wait_for_messages(max: 100) ⇒ Array<Google::Cloud::Pubsub::ReceivedMessage>
Pulls from the server while waiting for messages to become available. This is the same as:
subscription.pull immediate: false
302 303 304 |
# File 'lib/google/cloud/pubsub/subscription.rb', line 302 def max: 100 pull immediate: false, max: max end |