Class: Google::Cloud::Pubsub::Subscriber
- Inherits:
-
Object
- Object
- Google::Cloud::Pubsub::Subscriber
- Includes:
- MonitorMixin
- Defined in:
- lib/google/cloud/pubsub/subscriber.rb,
lib/google/cloud/pubsub/subscriber/stream.rb,
lib/google/cloud/pubsub/subscriber/async_pusher.rb,
lib/google/cloud/pubsub/subscriber/enumerator_queue.rb
Overview
Subscriber object used to stream and process messages from a Subscription. See Google::Cloud::Pubsub::Subscription#listen
Instance Attribute Summary collapse
-
#callback ⇒ Proc
readonly
The procedure that will handle the messages received from the subscription.
-
#callback_threads ⇒ Integer
readonly
The number of threads used to handle the received messages.
-
#deadline ⇒ Numeric
readonly
The default number of seconds the stream will hold received messages before modifying the message's ack deadline.
-
#inventory ⇒ Integer
readonly
The number of received messages to be collected by subscriber.
-
#push_threads ⇒ Integer
readonly
The number of threads to handle acknowledgement (ReceivedMessage#ack!) and delay messages (ReceivedMessage#nack!, ReceivedMessage#delay!).
-
#streams ⇒ Integer
readonly
The number of concurrent streams to open to pull messages from the subscription.
-
#subscription_name ⇒ String
readonly
The name of the subscription the messages are pulled from.
Instance Method Summary collapse
-
#start ⇒ Subscriber
Starts the subscriber pulling from the subscription and processing the received messages.
-
#started? ⇒ boolean
Whether the subscriber has been started.
-
#stop ⇒ Subscriber
Begins the process of stopping the subscriber.
-
#stopped? ⇒ boolean
Whether the subscriber has been stopped.
-
#wait! ⇒ Subscriber
Blocks until the subscriber is fully stopped and all received messages have been handled.
Instance Attribute Details
#callback ⇒ Proc (readonly)
The procedure that will handle the messages received from the subscription.
61 62 63 |
# File 'lib/google/cloud/pubsub/subscriber.rb', line 61 def callback @callback end |
#callback_threads ⇒ Integer (readonly)
The number of threads used to handle the received messages. Default is 8.
61 62 63 |
# File 'lib/google/cloud/pubsub/subscriber.rb', line 61 def callback_threads @callback_threads end |
#deadline ⇒ Numeric (readonly)
The default number of seconds the stream will hold received messages before modifying the message's ack deadline. The minimum is 10, the maximum is 600. Default is 60.
61 62 63 |
# File 'lib/google/cloud/pubsub/subscriber.rb', line 61 def deadline @deadline end |
#inventory ⇒ Integer (readonly)
The number of received messages to be collected by subscriber. Default is 1,000.
61 62 63 |
# File 'lib/google/cloud/pubsub/subscriber.rb', line 61 def inventory @inventory end |
#push_threads ⇒ Integer (readonly)
The number of threads to handle acknowledgement (ReceivedMessage#ack!) and delay messages (ReceivedMessage#nack!, ReceivedMessage#delay!). Default is 4.
61 62 63 |
# File 'lib/google/cloud/pubsub/subscriber.rb', line 61 def push_threads @push_threads end |
#streams ⇒ Integer (readonly)
The number of concurrent streams to open to pull messages from the subscription. Default is 4.
61 62 63 |
# File 'lib/google/cloud/pubsub/subscriber.rb', line 61 def streams @streams end |
#subscription_name ⇒ String (readonly)
The name of the subscription the messages are pulled from.
61 62 63 |
# File 'lib/google/cloud/pubsub/subscriber.rb', line 61 def subscription_name @subscription_name end |
Instance Method Details
#start ⇒ Subscriber
Starts the subscriber pulling from the subscription and processing the received messages.
102 103 104 105 106 107 108 109 110 111 112 113 114 |
# File 'lib/google/cloud/pubsub/subscriber.rb', line 102 def start start_pool = synchronize do @started = true @stopped = false @stream_pool.map do |stream| Thread.new { stream.start } end end start_pool.join self end |
#started? ⇒ boolean
Whether the subscriber has been started.
159 160 161 |
# File 'lib/google/cloud/pubsub/subscriber.rb', line 159 def started? synchronize { @started } end |
#stop ⇒ Subscriber
Begins the process of stopping the subscriber. Unhandled received messages will be processed, but no new messages will be pulled from the subscription. Use #wait! to block until the subscriber is fully stopped and all received messages have been processed.
123 124 125 126 127 128 129 130 131 132 133 134 135 |
# File 'lib/google/cloud/pubsub/subscriber.rb', line 123 def stop stop_pool = synchronize do @started = false @stopped = true @stream_pool.map do |stream| Thread.new { stream.stop } end end stop_pool.join self end |
#stopped? ⇒ boolean
Whether the subscriber has been stopped.
167 168 169 |
# File 'lib/google/cloud/pubsub/subscriber.rb', line 167 def stopped? synchronize { @stopped } end |
#wait! ⇒ Subscriber
144 145 146 147 148 149 150 151 152 153 |
# File 'lib/google/cloud/pubsub/subscriber.rb', line 144 def wait! wait_pool = synchronize do @stream_pool.map do |stream| Thread.new { stream.wait! } end end wait_pool.join self end |