Class: Google::Cloud::Pubsub::Subscriber

Inherits:
Object
  • Object
show all
Includes:
MonitorMixin
Defined in:
lib/google/cloud/pubsub/subscriber.rb,
lib/google/cloud/pubsub/subscriber/stream.rb,
lib/google/cloud/pubsub/subscriber/async_pusher.rb,
lib/google/cloud/pubsub/subscriber/enumerator_queue.rb

Overview

Subscriber object used to stream and process messages from a Subscription. See Google::Cloud::Pubsub::Subscription#listen

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

sub = pubsub.subscription "my-topic-sub"

subscriber = sub.listen do |msg|
  # process msg
  msg.ack!
end

subscriber.start

# Shut down the subscriber when ready to stop receiving messages.
subscriber.stop.wait!

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#callbackProc (readonly)

The procedure that will handle the messages received from the subscription.

Returns:

  • (Proc)

    the current value of callback



61
62
63
# File 'lib/google/cloud/pubsub/subscriber.rb', line 61

def callback
  @callback
end

#callback_threadsInteger (readonly)

The number of threads used to handle the received messages. Default is 8.

Returns:

  • (Integer)

    the current value of callback_threads



61
62
63
# File 'lib/google/cloud/pubsub/subscriber.rb', line 61

def callback_threads
  @callback_threads
end

#deadlineNumeric (readonly)

The default number of seconds the stream will hold received messages before modifying the message's ack deadline. The minimum is 10, the maximum is 600. Default is 60.

Returns:

  • (Numeric)

    the current value of deadline



61
62
63
# File 'lib/google/cloud/pubsub/subscriber.rb', line 61

def deadline
  @deadline
end

#inventoryInteger (readonly)

The number of received messages to be collected by subscriber. Default is 1,000.

Returns:

  • (Integer)

    the current value of inventory



61
62
63
# File 'lib/google/cloud/pubsub/subscriber.rb', line 61

def inventory
  @inventory
end

#push_threadsInteger (readonly)

The number of threads to handle acknowledgement (ReceivedMessage#ack!) and delay messages (ReceivedMessage#nack!, ReceivedMessage#delay!). Default is 4.

Returns:

  • (Integer)

    the current value of push_threads



61
62
63
# File 'lib/google/cloud/pubsub/subscriber.rb', line 61

def push_threads
  @push_threads
end

#streamsInteger (readonly)

The number of concurrent streams to open to pull messages from the subscription. Default is 4.

Returns:

  • (Integer)

    the current value of streams



61
62
63
# File 'lib/google/cloud/pubsub/subscriber.rb', line 61

def streams
  @streams
end

#subscription_nameString (readonly)

The name of the subscription the messages are pulled from.

Returns:

  • (String)

    the current value of subscription_name



61
62
63
# File 'lib/google/cloud/pubsub/subscriber.rb', line 61

def subscription_name
  @subscription_name
end

Instance Method Details

#startSubscriber

Starts the subscriber pulling from the subscription and processing the received messages.

Returns:

  • (Subscriber)

    returns self so calls can be chained.



102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/google/cloud/pubsub/subscriber.rb', line 102

def start
  start_pool = synchronize do
    @started = true
    @stopped = false

    @stream_pool.map do |stream|
      Thread.new { stream.start }
    end
  end
  start_pool.join

  self
end

#started?boolean

Whether the subscriber has been started.

Returns:

  • (boolean)

    true when started, false otherwise.



159
160
161
# File 'lib/google/cloud/pubsub/subscriber.rb', line 159

def started?
  synchronize { @started }
end

#stopSubscriber

Begins the process of stopping the subscriber. Unhandled received messages will be processed, but no new messages will be pulled from the subscription. Use #wait! to block until the subscriber is fully stopped and all received messages have been processed.

Returns:

  • (Subscriber)

    returns self so calls can be chained.



123
124
125
126
127
128
129
130
131
132
133
134
135
# File 'lib/google/cloud/pubsub/subscriber.rb', line 123

def stop
  stop_pool = synchronize do
    @started = false
    @stopped = true

    @stream_pool.map do |stream|
      Thread.new { stream.stop }
    end
  end
  stop_pool.join

  self
end

#stopped?boolean

Whether the subscriber has been stopped.

Returns:

  • (boolean)

    true when stopped, false otherwise.



167
168
169
# File 'lib/google/cloud/pubsub/subscriber.rb', line 167

def stopped?
  synchronize { @stopped }
end

#wait!Subscriber

Blocks until the subscriber is fully stopped and all received messages have been handled. Does not stop the subscriber. To stop the subscriber, first call #stop and then call #wait! to block until the subscriber is stopped.

Returns:

  • (Subscriber)

    returns self so calls can be chained.



144
145
146
147
148
149
150
151
152
153
# File 'lib/google/cloud/pubsub/subscriber.rb', line 144

def wait!
  wait_pool = synchronize do
    @stream_pool.map do |stream|
      Thread.new { stream.wait! }
    end
  end
  wait_pool.join

  self
end