Class: Google::Cloud::Pubsub::Subscriber

Inherits:
Object
  • Object
show all
Includes:
MonitorMixin
Defined in:
lib/google/cloud/pubsub/subscriber.rb,
lib/google/cloud/pubsub/subscriber/stream.rb,
lib/google/cloud/pubsub/subscriber/inventory.rb,
lib/google/cloud/pubsub/subscriber/enumerator_queue.rb,
lib/google/cloud/pubsub/subscriber/async_unary_pusher.rb,
lib/google/cloud/pubsub/subscriber/async_stream_pusher.rb

Overview

Subscriber object used to stream and process messages from a Subscription. See Google::Cloud::Pubsub::Subscription#listen

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

sub = pubsub.subscription "my-topic-sub"

subscriber = sub.listen do |received_message|
  # process message
  received_message.acknowledge!
end

# Start background threads that will call the block passed to listen.
subscriber.start

# Shut down the subscriber when ready to stop receiving messages.
subscriber.stop.wait!

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#callbackProc (readonly)

The procedure that will handle the messages received from the subscription.

Returns:

  • (Proc)

    the current value of callback



62
63
64
# File 'lib/google/cloud/pubsub/subscriber.rb', line 62

def callback
  @callback
end

#callback_threadsInteger (readonly)

The number of threads used to handle the received messages. Default is 8.

Returns:

  • (Integer)

    the current value of callback_threads



62
63
64
# File 'lib/google/cloud/pubsub/subscriber.rb', line 62

def callback_threads
  @callback_threads
end

#deadlineNumeric (readonly)

The default number of seconds the stream will hold received messages before modifying the message's ack deadline. The minimum is 10, the maximum is 600. Default is 60.

Returns:

  • (Numeric)

    the current value of deadline



62
63
64
# File 'lib/google/cloud/pubsub/subscriber.rb', line 62

def deadline
  @deadline
end

#inventoryInteger (readonly)

The number of received messages to be collected by subscriber. Default is 1,000.

Returns:

  • (Integer)

    the current value of inventory



62
63
64
# File 'lib/google/cloud/pubsub/subscriber.rb', line 62

def inventory
  @inventory
end

#push_threadsInteger (readonly)

The number of threads to handle acknowledgement (ReceivedMessage#ack!) and delay messages (ReceivedMessage#nack!, ReceivedMessage#delay!). Default is 4.

Returns:

  • (Integer)

    the current value of push_threads



62
63
64
# File 'lib/google/cloud/pubsub/subscriber.rb', line 62

def push_threads
  @push_threads
end

#streamsInteger (readonly)

The number of concurrent streams to open to pull messages from the subscription. Default is 4.

Returns:

  • (Integer)

    the current value of streams



62
63
64
# File 'lib/google/cloud/pubsub/subscriber.rb', line 62

def streams
  @streams
end

#subscription_nameString (readonly)

The name of the subscription the messages are pulled from.

Returns:

  • (String)

    the current value of subscription_name



62
63
64
# File 'lib/google/cloud/pubsub/subscriber.rb', line 62

def subscription_name
  @subscription_name
end

Instance Method Details

#last_errorException?

The most recent unhandled error to occur while listening to messages on the subscriber.

If an unhandled error has occurred the subscriber will attempt to recover from the error and resume listening.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

sub = pubsub.subscription "my-topic-sub"

subscriber = sub.listen do |received_message|
  # process message
  received_message.acknowledge!
end

# Start listening for messages and errors.
subscriber.start

# If an error was raised, it can be retrieved here:
subscriber.last_error #=> nil

# Shut down the subscriber when ready to stop receiving messages.
subscriber.stop.wait!

Returns:

  • (Exception, nil)

    error The most recent error raised.



244
245
246
# File 'lib/google/cloud/pubsub/subscriber.rb', line 244

def last_error
  synchronize { @last_error }
end

#on_error {|callback| ... } ⇒ Object

Register to be notified of errors when raised.

If an unhandled error has occurred the subscriber will attempt to recover from the error and resume listening.

Multiple error handlers can be added.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

sub = pubsub.subscription "my-topic-sub"

subscriber = sub.listen do |received_message|
  # process message
  received_message.acknowledge!
end

# Register to be notified when unhandled errors occur.
subscriber.on_error do |error|
  # log error
  puts error
end

# Start listening for messages and errors.
subscriber.start

# Shut down the subscriber when ready to stop receiving messages.
subscriber.stop.wait!

Yields:

  • (callback)

    The block to be called when an error is raised.

Yield Parameters:

  • error (Exception)

    The error raised.



208
209
210
211
212
# File 'lib/google/cloud/pubsub/subscriber.rb', line 208

def on_error &block
  synchronize do
    @error_callbacks << block
  end
end

#startSubscriber

Starts the subscriber pulling from the subscription and processing the received messages.

Returns:

  • (Subscriber)

    returns self so calls can be chained.



104
105
106
107
108
109
110
111
112
113
114
115
116
# File 'lib/google/cloud/pubsub/subscriber.rb', line 104

def start
  start_pool = synchronize do
    @started = true
    @stopped = false

    @stream_pool.map do |stream|
      Thread.new { stream.start }
    end
  end
  start_pool.map(&:join)

  self
end

#started?boolean

Whether the subscriber has been started.

Returns:

  • (boolean)

    true when started, false otherwise.



161
162
163
# File 'lib/google/cloud/pubsub/subscriber.rb', line 161

def started?
  synchronize { @started }
end

#stopSubscriber

Begins the process of stopping the subscriber. Unhandled received messages will be processed, but no new messages will be pulled from the subscription. Use #wait! to block until the subscriber is fully stopped and all received messages have been processed.

Returns:

  • (Subscriber)

    returns self so calls can be chained.



125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/google/cloud/pubsub/subscriber.rb', line 125

def stop
  stop_pool = synchronize do
    @started = false
    @stopped = true

    @stream_pool.map do |stream|
      Thread.new { stream.stop }
    end
  end
  stop_pool.map(&:join)

  self
end

#stopped?boolean

Whether the subscriber has been stopped.

Returns:

  • (boolean)

    true when stopped, false otherwise.



169
170
171
# File 'lib/google/cloud/pubsub/subscriber.rb', line 169

def stopped?
  synchronize { @stopped }
end

#wait!Subscriber

Blocks until the subscriber is fully stopped and all received messages have been handled. Does not stop the subscriber. To stop the subscriber, first call #stop and then call #wait! to block until the subscriber is stopped.

Returns:

  • (Subscriber)

    returns self so calls can be chained.



146
147
148
149
150
151
152
153
154
155
# File 'lib/google/cloud/pubsub/subscriber.rb', line 146

def wait!
  wait_pool = synchronize do
    @stream_pool.map do |stream|
      Thread.new { stream.wait! }
    end
  end
  wait_pool.map(&:join)

  self
end