Class: Google::Cloud::Pubsub::Subscriber
- Inherits:
-
Object
- Object
- Google::Cloud::Pubsub::Subscriber
- Includes:
- MonitorMixin
- Defined in:
- lib/google/cloud/pubsub/subscriber.rb,
lib/google/cloud/pubsub/subscriber/stream.rb,
lib/google/cloud/pubsub/subscriber/async_pusher.rb,
lib/google/cloud/pubsub/subscriber/enumerator_queue.rb
Overview
Subscriber object used to stream and process messages from a Subscription. See Google::Cloud::Pubsub::Subscription#listen
Instance Attribute Summary collapse
-
#callback ⇒ Proc
readonly
The procedure that will handle the messages received from the subscription.
-
#callback_threads ⇒ Integer
readonly
The number of threads used to handle the received messages.
-
#deadline ⇒ Numeric
readonly
The default number of seconds the stream will hold received messages before modifying the message's ack deadline.
-
#inventory ⇒ Integer
readonly
The number of received messages to be collected by subscriber.
-
#push_threads ⇒ Integer
readonly
The number of threads to handle acknowledgement (ReceivedMessage#ack!) and delay messages (ReceivedMessage#nack!, ReceivedMessage#delay!).
-
#streams ⇒ Integer
readonly
The number of concurrent streams to open to pull messages from the subscription.
-
#subscription_name ⇒ String
readonly
The name of the subscription the messages are pulled from.
Instance Method Summary collapse
-
#start ⇒ Subscriber
Starts the subscriber pulling from the subscription and processing the received messages.
-
#started? ⇒ boolean
Whether the subscriber has been started.
-
#stop ⇒ Subscriber
Begins the process of stopping the subscriber.
-
#stopped? ⇒ boolean
Whether the subscriber has been stopped.
-
#wait! ⇒ Subscriber
Blocks until the subscriber is fully stopped and all received messages have been handled.
Instance Attribute Details
#callback ⇒ Proc (readonly)
The procedure that will handle the messages received from the subscription.
62 63 64 |
# File 'lib/google/cloud/pubsub/subscriber.rb', line 62 def callback @callback end |
#callback_threads ⇒ Integer (readonly)
The number of threads used to handle the received messages. Default is 8.
62 63 64 |
# File 'lib/google/cloud/pubsub/subscriber.rb', line 62 def callback_threads @callback_threads end |
#deadline ⇒ Numeric (readonly)
The default number of seconds the stream will hold received messages before modifying the message's ack deadline. The minimum is 10, the maximum is 600. Default is 60.
62 63 64 |
# File 'lib/google/cloud/pubsub/subscriber.rb', line 62 def deadline @deadline end |
#inventory ⇒ Integer (readonly)
The number of received messages to be collected by subscriber. Default is 1,000.
62 63 64 |
# File 'lib/google/cloud/pubsub/subscriber.rb', line 62 def inventory @inventory end |
#push_threads ⇒ Integer (readonly)
The number of threads to handle acknowledgement (ReceivedMessage#ack!) and delay messages (ReceivedMessage#nack!, ReceivedMessage#delay!). Default is 4.
62 63 64 |
# File 'lib/google/cloud/pubsub/subscriber.rb', line 62 def push_threads @push_threads end |
#streams ⇒ Integer (readonly)
The number of concurrent streams to open to pull messages from the subscription. Default is 4.
62 63 64 |
# File 'lib/google/cloud/pubsub/subscriber.rb', line 62 def streams @streams end |
#subscription_name ⇒ String (readonly)
The name of the subscription the messages are pulled from.
62 63 64 |
# File 'lib/google/cloud/pubsub/subscriber.rb', line 62 def subscription_name @subscription_name end |
Instance Method Details
#start ⇒ Subscriber
Starts the subscriber pulling from the subscription and processing the received messages.
103 104 105 106 107 108 109 110 111 112 113 114 115 |
# File 'lib/google/cloud/pubsub/subscriber.rb', line 103 def start start_pool = synchronize do @started = true @stopped = false @stream_pool.map do |stream| Thread.new { stream.start } end end start_pool.map(&:join) self end |
#started? ⇒ boolean
Whether the subscriber has been started.
160 161 162 |
# File 'lib/google/cloud/pubsub/subscriber.rb', line 160 def started? synchronize { @started } end |
#stop ⇒ Subscriber
Begins the process of stopping the subscriber. Unhandled received messages will be processed, but no new messages will be pulled from the subscription. Use #wait! to block until the subscriber is fully stopped and all received messages have been processed.
124 125 126 127 128 129 130 131 132 133 134 135 136 |
# File 'lib/google/cloud/pubsub/subscriber.rb', line 124 def stop stop_pool = synchronize do @started = false @stopped = true @stream_pool.map do |stream| Thread.new { stream.stop } end end stop_pool.map(&:join) self end |
#stopped? ⇒ boolean
Whether the subscriber has been stopped.
168 169 170 |
# File 'lib/google/cloud/pubsub/subscriber.rb', line 168 def stopped? synchronize { @stopped } end |
#wait! ⇒ Subscriber
145 146 147 148 149 150 151 152 153 154 |
# File 'lib/google/cloud/pubsub/subscriber.rb', line 145 def wait! wait_pool = synchronize do @stream_pool.map do |stream| Thread.new { stream.wait! } end end wait_pool.map(&:join) self end |