Class: Google::Cloud::Pubsub::Subscriber
- Inherits:
-
Object
- Object
- Google::Cloud::Pubsub::Subscriber
- Includes:
- MonitorMixin
- Defined in:
- lib/google/cloud/pubsub/subscriber.rb,
lib/google/cloud/pubsub/subscriber/stream.rb,
lib/google/cloud/pubsub/subscriber/inventory.rb,
lib/google/cloud/pubsub/subscriber/enumerator_queue.rb,
lib/google/cloud/pubsub/subscriber/async_unary_pusher.rb,
lib/google/cloud/pubsub/subscriber/async_stream_pusher.rb
Overview
Subscriber object used to stream and process messages from a Subscription. See Google::Cloud::Pubsub::Subscription#listen
Instance Attribute Summary collapse
-
#callback ⇒ Proc
readonly
The procedure that will handle the messages received from the subscription.
-
#callback_threads ⇒ Integer
readonly
The number of threads used to handle the received messages.
-
#deadline ⇒ Numeric
readonly
The default number of seconds the stream will hold received messages before modifying the message's ack deadline.
-
#inventory ⇒ Integer
readonly
The number of received messages to be collected by subscriber.
-
#push_threads ⇒ Integer
readonly
The number of threads to handle acknowledgement (ReceivedMessage#ack!) and delay messages (ReceivedMessage#nack!, ReceivedMessage#delay!).
-
#streams ⇒ Integer
readonly
The number of concurrent streams to open to pull messages from the subscription.
-
#subscription_name ⇒ String
readonly
The name of the subscription the messages are pulled from.
Instance Method Summary collapse
-
#last_error ⇒ Exception?
The most recent unhandled error to occur while listening to messages on the subscriber.
-
#on_error {|callback| ... } ⇒ Object
Register to be notified of errors when raised.
-
#start ⇒ Subscriber
Starts the subscriber pulling from the subscription and processing the received messages.
-
#started? ⇒ boolean
Whether the subscriber has been started.
-
#stop ⇒ Subscriber
Begins the process of stopping the subscriber.
-
#stopped? ⇒ boolean
Whether the subscriber has been stopped.
-
#wait! ⇒ Subscriber
Blocks until the subscriber is fully stopped and all received messages have been handled.
Instance Attribute Details
#callback ⇒ Proc (readonly)
The procedure that will handle the messages received from the subscription.
62 63 64 |
# File 'lib/google/cloud/pubsub/subscriber.rb', line 62 def callback @callback end |
#callback_threads ⇒ Integer (readonly)
The number of threads used to handle the received messages. Default is 8.
62 63 64 |
# File 'lib/google/cloud/pubsub/subscriber.rb', line 62 def callback_threads @callback_threads end |
#deadline ⇒ Numeric (readonly)
The default number of seconds the stream will hold received messages before modifying the message's ack deadline. The minimum is 10, the maximum is 600. Default is 60.
62 63 64 |
# File 'lib/google/cloud/pubsub/subscriber.rb', line 62 def deadline @deadline end |
#inventory ⇒ Integer (readonly)
The number of received messages to be collected by subscriber. Default is 1,000.
62 63 64 |
# File 'lib/google/cloud/pubsub/subscriber.rb', line 62 def inventory @inventory end |
#push_threads ⇒ Integer (readonly)
The number of threads to handle acknowledgement (ReceivedMessage#ack!) and delay messages (ReceivedMessage#nack!, ReceivedMessage#delay!). Default is 4.
62 63 64 |
# File 'lib/google/cloud/pubsub/subscriber.rb', line 62 def push_threads @push_threads end |
#streams ⇒ Integer (readonly)
The number of concurrent streams to open to pull messages from the subscription. Default is 4.
62 63 64 |
# File 'lib/google/cloud/pubsub/subscriber.rb', line 62 def streams @streams end |
#subscription_name ⇒ String (readonly)
The name of the subscription the messages are pulled from.
62 63 64 |
# File 'lib/google/cloud/pubsub/subscriber.rb', line 62 def subscription_name @subscription_name end |
Instance Method Details
#last_error ⇒ Exception?
The most recent unhandled error to occur while listening to messages on the subscriber.
If an unhandled error has occurred the subscriber will attempt to recover from the error and resume listening.
244 245 246 |
# File 'lib/google/cloud/pubsub/subscriber.rb', line 244 def last_error synchronize { @last_error } end |
#on_error {|callback| ... } ⇒ Object
Register to be notified of errors when raised.
If an unhandled error has occurred the subscriber will attempt to recover from the error and resume listening.
Multiple error handlers can be added.
208 209 210 211 212 |
# File 'lib/google/cloud/pubsub/subscriber.rb', line 208 def on_error &block synchronize do @error_callbacks << block end end |
#start ⇒ Subscriber
Starts the subscriber pulling from the subscription and processing the received messages.
104 105 106 107 108 109 110 111 112 113 114 115 116 |
# File 'lib/google/cloud/pubsub/subscriber.rb', line 104 def start start_pool = synchronize do @started = true @stopped = false @stream_pool.map do |stream| Thread.new { stream.start } end end start_pool.map(&:join) self end |
#started? ⇒ boolean
Whether the subscriber has been started.
161 162 163 |
# File 'lib/google/cloud/pubsub/subscriber.rb', line 161 def started? synchronize { @started } end |
#stop ⇒ Subscriber
Begins the process of stopping the subscriber. Unhandled received messages will be processed, but no new messages will be pulled from the subscription. Use #wait! to block until the subscriber is fully stopped and all received messages have been processed.
125 126 127 128 129 130 131 132 133 134 135 136 137 |
# File 'lib/google/cloud/pubsub/subscriber.rb', line 125 def stop stop_pool = synchronize do @started = false @stopped = true @stream_pool.map do |stream| Thread.new { stream.stop } end end stop_pool.map(&:join) self end |
#stopped? ⇒ boolean
Whether the subscriber has been stopped.
169 170 171 |
# File 'lib/google/cloud/pubsub/subscriber.rb', line 169 def stopped? synchronize { @stopped } end |
#wait! ⇒ Subscriber
146 147 148 149 150 151 152 153 154 155 |
# File 'lib/google/cloud/pubsub/subscriber.rb', line 146 def wait! wait_pool = synchronize do @stream_pool.map do |stream| Thread.new { stream.wait! } end end wait_pool.map(&:join) self end |