Class: Google::Cloud::Pubsub::Subscription
- Inherits:
-
Object
- Object
- Google::Cloud::Pubsub::Subscription
- Defined in:
- lib/google/cloud/pubsub/subscription.rb,
lib/google/cloud/pubsub/subscription/list.rb
Overview
Subscription
A named resource representing the stream of messages from a single, specific Topic, to be delivered to the subscribing application.
Defined Under Namespace
Classes: List
Instance Method Summary collapse
-
#acknowledge(*messages) ⇒ Object
(also: #ack)
Acknowledges receipt of a message.
-
#deadline ⇒ Object
This value is the maximum number of seconds after a subscriber receives a message before the subscriber should acknowledge the message.
-
#delay(new_deadline, *messages) ⇒ Object
Modifies the acknowledge deadline for messages.
-
#delete ⇒ Boolean
Deletes an existing subscription.
-
#endpoint ⇒ Object
Returns the URL locating the endpoint to which messages should be pushed.
-
#endpoint=(new_endpoint) ⇒ Object
Sets the URL locating the endpoint to which messages should be pushed.
-
#exists? ⇒ Boolean
Determines whether the subscription exists in the Pub/Sub service.
-
#listen(max: 100, autoack: false, delay: 1) {|msg| ... } ⇒ Object
Poll the backend for new messages.
-
#name ⇒ Object
The name of the subscription.
-
#policy(force: nil) {|policy| ... } ⇒ Policy
Gets the Cloud IAM access control policy for this subscription.
-
#policy=(new_policy) ⇒ Object
Updates the Cloud IAM access control policy for this subscription.
-
#pull(immediate: true, max: 100, autoack: false) ⇒ Array<Google::Cloud::Pubsub::ReceivedMessage>
Pulls messages from the server.
-
#test_permissions(*permissions) ⇒ Array<String>
Tests the specified permissions against the Cloud IAM access control policy.
-
#topic ⇒ Topic
The Topic from which this subscription receives messages.
-
#wait_for_messages(max: 100, autoack: false) ⇒ Array<Google::Cloud::Pubsub::ReceivedMessage>
Pulls from the server while waiting for messages to become available.
Instance Method Details
#acknowledge(*messages) ⇒ Object Also known as: ack
Acknowledges receipt of a message. After an ack, the Pub/Sub system can remove the message from the subscription. Acknowledging a message whose ack deadline has expired may succeed, although the message may have been sent again. Acknowledging a message more than once will not result in an error. This is only used for messages received via pull.
339 340 341 342 343 344 345 |
# File 'lib/google/cloud/pubsub/subscription.rb', line 339 def acknowledge * ack_ids = coerce_ack_ids return true if ack_ids.empty? ensure_service! service.acknowledge name, *ack_ids true end |
#deadline ⇒ Object
This value is the maximum number of seconds after a subscriber receives a message before the subscriber should acknowledge the message.
95 96 97 98 |
# File 'lib/google/cloud/pubsub/subscription.rb', line 95 def deadline ensure_grpc! @grpc.ack_deadline_seconds end |
#delay(new_deadline, *messages) ⇒ Object
Modifies the acknowledge deadline for messages.
This indicates that more time is needed to process the messages, or to make the messages available for redelivery if the processing was interrupted.
372 373 374 375 376 377 |
# File 'lib/google/cloud/pubsub/subscription.rb', line 372 def delay new_deadline, * ack_ids = coerce_ack_ids ensure_service! service.modify_ack_deadline name, ack_ids, new_deadline true end |
#delete ⇒ Boolean
Deletes an existing subscription. All pending messages in the subscription are immediately dropped.
172 173 174 175 176 |
# File 'lib/google/cloud/pubsub/subscription.rb', line 172 def delete ensure_service! service.delete_subscription name true end |
#endpoint ⇒ Object
Returns the URL locating the endpoint to which messages should be pushed.
103 104 105 106 |
# File 'lib/google/cloud/pubsub/subscription.rb', line 103 def endpoint ensure_grpc! @grpc.push_config.push_endpoint if @grpc.push_config end |
#endpoint=(new_endpoint) ⇒ Object
Sets the URL locating the endpoint to which messages should be pushed.
110 111 112 113 114 115 116 117 |
# File 'lib/google/cloud/pubsub/subscription.rb', line 110 def endpoint= new_endpoint ensure_service! service.modify_push_config name, new_endpoint, {} @grpc.push_config = Google::Pubsub::V1::PushConfig.new( push_endpoint: new_endpoint, attributes: {} ) if @grpc end |
#exists? ⇒ Boolean
Determines whether the subscription exists in the Pub/Sub service.
130 131 132 133 134 135 136 137 138 139 |
# File 'lib/google/cloud/pubsub/subscription.rb', line 130 def exists? # Always true if we have a grpc object return true unless @grpc.nil? # If we have a value, return it return @exists unless @exists.nil? ensure_grpc! @exists = !@grpc.nil? rescue Google::Cloud::NotFoundError @exists = false end |
#listen(max: 100, autoack: false, delay: 1) {|msg| ... } ⇒ Object
Poll the backend for new messages. This runs a loop to ping the API, blocking indefinitely, yielding retrieved messages as they are received.
308 309 310 311 312 313 314 315 316 317 |
# File 'lib/google/cloud/pubsub/subscription.rb', line 308 def listen max: 100, autoack: false, delay: 1 loop do msgs = max: max, autoack: autoack if msgs.any? msgs.each { |msg| yield msg } else sleep delay end end end |
#name ⇒ Object
The name of the subscription.
69 70 71 |
# File 'lib/google/cloud/pubsub/subscription.rb', line 69 def name @grpc ? @grpc.name : @name end |
#policy(force: nil) {|policy| ... } ⇒ Policy
Gets the Cloud IAM access control policy for this subscription.
428 429 430 431 432 433 434 435 436 437 438 439 |
# File 'lib/google/cloud/pubsub/subscription.rb', line 428 def policy force: nil @policy = nil if force || block_given? @policy ||= begin ensure_service! grpc = service.get_subscription_policy name Policy.from_grpc grpc end return @policy unless block_given? p = @policy.deep_dup yield p self.policy = p end |
#policy=(new_policy) ⇒ Object
Updates the Cloud IAM access control
policy for this subscription. The policy should be read from
#policy. See Policy for an explanation of
the policy etag
property and how to modify policies.
You can also update the policy by passing a block to #policy, which will call this method internally after the block completes.
468 469 470 471 472 |
# File 'lib/google/cloud/pubsub/subscription.rb', line 468 def policy= new_policy ensure_service! grpc = service.set_subscription_policy name, new_policy.to_grpc @policy = Policy.from_grpc grpc end |
#pull(immediate: true, max: 100, autoack: false) ⇒ Array<Google::Cloud::Pubsub::ReceivedMessage>
Pulls messages from the server. Returns an empty list if there are no
messages available in the backlog. Raises an ApiError with status
UNAVAILABLE
if there are too many concurrent pull requests pending
for the given subscription.
222 223 224 225 226 227 228 229 230 231 232 233 |
# File 'lib/google/cloud/pubsub/subscription.rb', line 222 def pull immediate: true, max: 100, autoack: false ensure_service! = { immediate: immediate, max: max } list_grpc = service.pull name, = Array(list_grpc.).map do |msg_grpc| ReceivedMessage.from_grpc msg_grpc, self end acknowledge if autoack rescue Google::Cloud::DeadlineExceededError [] end |
#test_permissions(*permissions) ⇒ Array<String>
Tests the specified permissions against the Cloud IAM access control policy.
506 507 508 509 510 511 |
# File 'lib/google/cloud/pubsub/subscription.rb', line 506 def * = Array().flatten ensure_service! grpc = service. name, grpc. end |
#topic ⇒ Topic
The Topic from which this subscription receives messages.
86 87 88 89 |
# File 'lib/google/cloud/pubsub/subscription.rb', line 86 def topic ensure_grpc! Topic.new_lazy @grpc.topic, service end |
#wait_for_messages(max: 100, autoack: false) ⇒ Array<Google::Cloud::Pubsub::ReceivedMessage>
Pulls from the server while waiting for messages to become available. This is the same as:
subscription.pull immediate: false
258 259 260 |
# File 'lib/google/cloud/pubsub/subscription.rb', line 258 def max: 100, autoack: false pull immediate: false, max: max, autoack: autoack end |