Class: Google::Cloud::Pubsub::Subscription
- Inherits:
-
Object
- Object
- Google::Cloud::Pubsub::Subscription
- Defined in:
- lib/google/cloud/pubsub/subscription.rb,
lib/google/cloud/pubsub/subscription/list.rb
Overview
Subscription
A named resource representing the stream of messages from a single, specific Topic, to be delivered to the subscribing application.
Defined Under Namespace
Classes: List
Instance Method Summary collapse
-
#acknowledge(*messages) ⇒ Object
(also: #ack)
Acknowledges receipt of a message.
-
#deadline ⇒ Object
This value is the maximum number of seconds after a subscriber receives a message before the subscriber should acknowledge the message.
-
#delay(new_deadline, *messages) ⇒ Object
Modifies the acknowledge deadline for messages.
-
#delete ⇒ Boolean
Deletes an existing subscription.
-
#endpoint ⇒ Object
Returns the URL locating the endpoint to which messages should be pushed.
-
#endpoint=(new_endpoint) ⇒ Object
Sets the URL locating the endpoint to which messages should be pushed.
-
#exists? ⇒ Boolean
Determines whether the subscription exists in the Pub/Sub service.
-
#listen(max: 100, autoack: false, delay: 1) {|msg| ... } ⇒ Object
Poll the backend for new messages.
-
#name ⇒ Object
The name of the subscription.
-
#policy(force: nil) {|policy| ... } ⇒ Policy
Gets the Cloud IAM access control policy for this subscription.
-
#policy=(new_policy) ⇒ Object
Updates the Cloud IAM access control policy for this subscription.
-
#pull(immediate: true, max: 100, autoack: false) ⇒ Array<Google::Cloud::Pubsub::ReceivedMessage>
Pulls messages from the server.
-
#test_permissions(*permissions) ⇒ Array<String>
Tests the specified permissions against the Cloud IAM access control policy.
-
#topic ⇒ Topic
The Topic from which this subscription receives messages.
-
#wait_for_messages(max: 100, autoack: false) ⇒ Array<Google::Cloud::Pubsub::ReceivedMessage>
Pulls from the server while waiting for messages to become available.
Instance Method Details
#acknowledge(*messages) ⇒ Object Also known as: ack
Acknowledges receipt of a message. After an ack, the Pub/Sub system can remove the message from the subscription. Acknowledging a message whose ack deadline has expired may succeed, although the message may have been sent again. Acknowledging a message more than once will not result in an error. This is only used for messages received via pull.
352 353 354 355 356 357 358 |
# File 'lib/google/cloud/pubsub/subscription.rb', line 352 def acknowledge * ack_ids = coerce_ack_ids return true if ack_ids.empty? ensure_service! service.acknowledge name, *ack_ids true end |
#deadline ⇒ Object
This value is the maximum number of seconds after a subscriber receives a message before the subscriber should acknowledge the message.
97 98 99 100 |
# File 'lib/google/cloud/pubsub/subscription.rb', line 97 def deadline ensure_grpc! @grpc.ack_deadline_seconds end |
#delay(new_deadline, *messages) ⇒ Object
Modifies the acknowledge deadline for messages.
This indicates that more time is needed to process the messages, or to make the messages available for redelivery if the processing was interrupted.
386 387 388 389 390 391 |
# File 'lib/google/cloud/pubsub/subscription.rb', line 386 def delay new_deadline, * ack_ids = coerce_ack_ids ensure_service! service.modify_ack_deadline name, ack_ids, new_deadline true end |
#delete ⇒ Boolean
Deletes an existing subscription. All pending messages in the subscription are immediately dropped.
177 178 179 180 181 |
# File 'lib/google/cloud/pubsub/subscription.rb', line 177 def delete ensure_service! service.delete_subscription name true end |
#endpoint ⇒ Object
Returns the URL locating the endpoint to which messages should be pushed.
105 106 107 108 |
# File 'lib/google/cloud/pubsub/subscription.rb', line 105 def endpoint ensure_grpc! @grpc.push_config.push_endpoint if @grpc.push_config end |
#endpoint=(new_endpoint) ⇒ Object
Sets the URL locating the endpoint to which messages should be pushed.
112 113 114 115 116 117 118 119 |
# File 'lib/google/cloud/pubsub/subscription.rb', line 112 def endpoint= new_endpoint ensure_service! service.modify_push_config name, new_endpoint, {} @grpc.push_config = Google::Pubsub::V1::PushConfig.new( push_endpoint: new_endpoint, attributes: {} ) if @grpc end |
#exists? ⇒ Boolean
Determines whether the subscription exists in the Pub/Sub service.
133 134 135 136 137 138 139 140 141 142 |
# File 'lib/google/cloud/pubsub/subscription.rb', line 133 def exists? # Always true if we have a grpc object return true unless @grpc.nil? # If we have a value, return it return @exists unless @exists.nil? ensure_grpc! @exists = !@grpc.nil? rescue Google::Cloud::NotFoundError @exists = false end |
#listen(max: 100, autoack: false, delay: 1) {|msg| ... } ⇒ Object
Poll the backend for new messages. This runs a loop to ping the API, blocking indefinitely, yielding retrieved messages as they are received.
320 321 322 323 324 325 326 327 328 329 |
# File 'lib/google/cloud/pubsub/subscription.rb', line 320 def listen max: 100, autoack: false, delay: 1 loop do msgs = max: max, autoack: autoack if msgs.any? msgs.each { |msg| yield msg } else sleep delay end end end |
#name ⇒ Object
The name of the subscription.
70 71 72 |
# File 'lib/google/cloud/pubsub/subscription.rb', line 70 def name @grpc ? @grpc.name : @name end |
#policy(force: nil) {|policy| ... } ⇒ Policy
Gets the Cloud IAM access control policy for this subscription.
445 446 447 448 449 450 451 452 453 454 455 456 |
# File 'lib/google/cloud/pubsub/subscription.rb', line 445 def policy force: nil @policy = nil if force || block_given? @policy ||= begin ensure_service! grpc = service.get_subscription_policy name Policy.from_grpc grpc end return @policy unless block_given? p = @policy.deep_dup yield p self.policy = p end |
#policy=(new_policy) ⇒ Object
Updates the Cloud IAM access control
policy for this subscription. The policy should be read from
#policy. See Policy for an explanation of
the policy etag
property and how to modify policies.
You can also update the policy by passing a block to #policy, which will call this method internally after the block completes.
486 487 488 489 490 |
# File 'lib/google/cloud/pubsub/subscription.rb', line 486 def policy= new_policy ensure_service! grpc = service.set_subscription_policy name, new_policy.to_grpc @policy = Policy.from_grpc grpc end |
#pull(immediate: true, max: 100, autoack: false) ⇒ Array<Google::Cloud::Pubsub::ReceivedMessage>
Pulls messages from the server. Returns an empty list if there are no
messages available in the backlog. Raises an ApiError with status
UNAVAILABLE
if there are too many concurrent pull requests pending
for the given subscription.
230 231 232 233 234 235 236 237 238 239 240 241 |
# File 'lib/google/cloud/pubsub/subscription.rb', line 230 def pull immediate: true, max: 100, autoack: false ensure_service! = { immediate: immediate, max: max } list_grpc = service.pull name, = Array(list_grpc.).map do |msg_grpc| ReceivedMessage.from_grpc msg_grpc, self end acknowledge if autoack rescue Google::Cloud::DeadlineExceededError [] end |
#test_permissions(*permissions) ⇒ Array<String>
Tests the specified permissions against the Cloud IAM access control policy.
525 526 527 528 529 530 |
# File 'lib/google/cloud/pubsub/subscription.rb', line 525 def * = Array().flatten ensure_service! grpc = service. name, grpc. end |
#topic ⇒ Topic
The Topic from which this subscription receives messages.
88 89 90 91 |
# File 'lib/google/cloud/pubsub/subscription.rb', line 88 def topic ensure_grpc! Topic.new_lazy @grpc.topic, service end |
#wait_for_messages(max: 100, autoack: false) ⇒ Array<Google::Cloud::Pubsub::ReceivedMessage>
Pulls from the server while waiting for messages to become available. This is the same as:
subscription.pull immediate: false
267 268 269 |
# File 'lib/google/cloud/pubsub/subscription.rb', line 267 def max: 100, autoack: false pull immediate: false, max: max, autoack: autoack end |