Class: Google::Cloud::Pubsub::Subscriber

Inherits:
Object
  • Object
show all
Includes:
MonitorMixin
Defined in:
lib/google/cloud/pubsub/subscriber.rb,
lib/google/cloud/pubsub/subscriber/stream.rb,
lib/google/cloud/pubsub/subscriber/enumerator_queue.rb,
lib/google/cloud/pubsub/subscriber/async_unary_pusher.rb,
lib/google/cloud/pubsub/subscriber/async_stream_pusher.rb

Overview

Subscriber object used to stream and process messages from a Subscription. See Google::Cloud::Pubsub::Subscription#listen

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

sub = pubsub.subscription "my-topic-sub"

subscriber = sub.listen do |received_message|
  # process message
  received_message.acknowledge!
end

# Start background threads that will call the block passed to listen.
subscriber.start

# Shut down the subscriber when ready to stop receiving messages.
subscriber.stop.wait!

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#callbackProc (readonly)

The procedure that will handle the messages received from the subscription.

Returns:

  • (Proc)

    the current value of callback



62
63
64
# File 'lib/google/cloud/pubsub/subscriber.rb', line 62

def callback
  @callback
end

#callback_threadsInteger (readonly)

The number of threads used to handle the received messages. Default is 8.

Returns:

  • (Integer)

    the current value of callback_threads



62
63
64
# File 'lib/google/cloud/pubsub/subscriber.rb', line 62

def callback_threads
  @callback_threads
end

#deadlineNumeric (readonly)

The default number of seconds the stream will hold received messages before modifying the message's ack deadline. The minimum is 10, the maximum is 600. Default is 60.

Returns:

  • (Numeric)

    the current value of deadline



62
63
64
# File 'lib/google/cloud/pubsub/subscriber.rb', line 62

def deadline
  @deadline
end

#inventoryInteger (readonly)

The number of received messages to be collected by subscriber. Default is 1,000.

Returns:

  • (Integer)

    the current value of inventory



62
63
64
# File 'lib/google/cloud/pubsub/subscriber.rb', line 62

def inventory
  @inventory
end

#push_threadsInteger (readonly)

The number of threads to handle acknowledgement (ReceivedMessage#ack!) and delay messages (ReceivedMessage#nack!, ReceivedMessage#delay!). Default is 4.

Returns:

  • (Integer)

    the current value of push_threads



62
63
64
# File 'lib/google/cloud/pubsub/subscriber.rb', line 62

def push_threads
  @push_threads
end

#streamsInteger (readonly)

The number of concurrent streams to open to pull messages from the subscription. Default is 4.

Returns:

  • (Integer)

    the current value of streams



62
63
64
# File 'lib/google/cloud/pubsub/subscriber.rb', line 62

def streams
  @streams
end

#subscription_nameString (readonly)

The name of the subscription the messages are pulled from.

Returns:

  • (String)

    the current value of subscription_name



62
63
64
# File 'lib/google/cloud/pubsub/subscriber.rb', line 62

def subscription_name
  @subscription_name
end

Instance Method Details

#startSubscriber

Starts the subscriber pulling from the subscription and processing the received messages.

Returns:

  • (Subscriber)

    returns self so calls can be chained.



103
104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/google/cloud/pubsub/subscriber.rb', line 103

def start
  start_pool = synchronize do
    @started = true
    @stopped = false

    @stream_pool.map do |stream|
      Thread.new { stream.start }
    end
  end
  start_pool.map(&:join)

  self
end

#started?boolean

Whether the subscriber has been started.

Returns:

  • (boolean)

    true when started, false otherwise.



160
161
162
# File 'lib/google/cloud/pubsub/subscriber.rb', line 160

def started?
  synchronize { @started }
end

#stopSubscriber

Begins the process of stopping the subscriber. Unhandled received messages will be processed, but no new messages will be pulled from the subscription. Use #wait! to block until the subscriber is fully stopped and all received messages have been processed.

Returns:

  • (Subscriber)

    returns self so calls can be chained.



124
125
126
127
128
129
130
131
132
133
134
135
136
# File 'lib/google/cloud/pubsub/subscriber.rb', line 124

def stop
  stop_pool = synchronize do
    @started = false
    @stopped = true

    @stream_pool.map do |stream|
      Thread.new { stream.stop }
    end
  end
  stop_pool.map(&:join)

  self
end

#stopped?boolean

Whether the subscriber has been stopped.

Returns:

  • (boolean)

    true when stopped, false otherwise.



168
169
170
# File 'lib/google/cloud/pubsub/subscriber.rb', line 168

def stopped?
  synchronize { @stopped }
end

#wait!Subscriber

Blocks until the subscriber is fully stopped and all received messages have been handled. Does not stop the subscriber. To stop the subscriber, first call #stop and then call #wait! to block until the subscriber is stopped.

Returns:

  • (Subscriber)

    returns self so calls can be chained.



145
146
147
148
149
150
151
152
153
154
# File 'lib/google/cloud/pubsub/subscriber.rb', line 145

def wait!
  wait_pool = synchronize do
    @stream_pool.map do |stream|
      Thread.new { stream.wait! }
    end
  end
  wait_pool.map(&:join)

  self
end