Class: Google::Cloud::Pubsub::AsyncPublisher

Inherits:
Object
  • Object
show all
Includes:
MonitorMixin
Defined in:
lib/google/cloud/pubsub/async_publisher.rb

Overview

Used to publish multiple messages in batches to a topic. See Topic#async_publisher

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

topic = pubsub.topic "my-topic"
topic.publish_async "task completed" do |result|
  if result.succeeded?
    log_publish_success result.data
  else
    log_publish_failure result.data, result.error
  end
end

topic.async_publisher.stop.wait!

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#callback_threadsNumeric (readonly)

The number of threads to handle the published messages' callbacks. Default is 8.

Returns:

  • (Numeric)

    the current value of callback_threads



59
60
61
# File 'lib/google/cloud/pubsub/async_publisher.rb', line 59

def callback_threads
  @callback_threads
end

#intervalNumeric (readonly)

The number of seconds to collect messages before the batch is published. Default is 0.25.

Returns:

  • (Numeric)

    the current value of interval



59
60
61
# File 'lib/google/cloud/pubsub/async_publisher.rb', line 59

def interval
  @interval
end

#max_bytesInteger (readonly)

The maximum size of messages to be collected before the batch is published. Default is 10,000,000 (10MB).

Returns:

  • (Integer)

    the current value of max_bytes



59
60
61
# File 'lib/google/cloud/pubsub/async_publisher.rb', line 59

def max_bytes
  @max_bytes
end

#max_messagesInteger (readonly)

The maximum number of messages to be collected before the batch is published. Default is 1,000.

Returns:

  • (Integer)

    the current value of max_messages



59
60
61
# File 'lib/google/cloud/pubsub/async_publisher.rb', line 59

def max_messages
  @max_messages
end

#publish_threadsNumeric (readonly)

The number of threads used to publish messages. Default is 4.

Returns:

  • (Numeric)

    the current value of publish_threads



59
60
61
# File 'lib/google/cloud/pubsub/async_publisher.rb', line 59

def publish_threads
  @publish_threads
end

#topic_nameString (readonly)

The name of the topic the messages are published to. In the form of "/projects/project-identifier/topics/topic-name".

Returns:

  • (String)

    the current value of topic_name



59
60
61
# File 'lib/google/cloud/pubsub/async_publisher.rb', line 59

def topic_name
  @topic_name
end

Instance Method Details

#flushAsyncPublisher

Forces all messages in the current batch to be published immediately.

Returns:



165
166
167
168
169
170
171
172
# File 'lib/google/cloud/pubsub/async_publisher.rb', line 165

def flush
  synchronize do
    publish_batch!
    @cond.signal
  end

  self
end

#publish(data = nil, attributes = {}, &block) ⇒ Object

Add a message to the async publisher to be published to the topic. Messages will be collected in batches and published together. See Topic#publish_async



92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
# File 'lib/google/cloud/pubsub/async_publisher.rb', line 92

def publish data = nil, attributes = {}, &block
  msg = create_pubsub_message data, attributes

  synchronize do
    fail "Can't publish when stopped." if @stopped

    if @batch.nil?
      @batch ||= Batch.new self
      @batch.add msg, block
    else
      unless @batch.try_add msg, block
        publish_batch!
        @batch = Batch.new self
        @batch.add msg, block
      end
    end

    init_resources!

    publish_batch! if @batch.ready?

    @cond.signal
  end
  nil
end

#started?boolean

Whether the publisher has been started.

Returns:

  • (boolean)

    true when started, false otherwise.



178
179
180
# File 'lib/google/cloud/pubsub/async_publisher.rb', line 178

def started?
  !stopped?
end

#stopAsyncPublisher

Begins the process of stopping the publisher. Messages already in the queue will be published, but no new messages can be added. Use #wait! to block until the publisher is fully stopped and all pending messages have been published.

Returns:



125
126
127
128
129
130
131
132
133
134
135
136
# File 'lib/google/cloud/pubsub/async_publisher.rb', line 125

def stop
  synchronize do
    break if @stopped

    @stopped = true
    publish_batch!
    @cond.signal
    @publish_thread_pool.shutdown if @publish_thread_pool
  end

  self
end

#stopped?boolean

Whether the publisher has been stopped.

Returns:

  • (boolean)

    true when stopped, false otherwise.



186
187
188
# File 'lib/google/cloud/pubsub/async_publisher.rb', line 186

def stopped?
  synchronize { @stopped }
end

#wait!(timeout = nil) ⇒ AsyncPublisher

Blocks until the publisher is fully stopped, all pending messages have been published, and all callbacks have completed. Does not stop the publisher. To stop the publisher, first call #stop and then call #wait! to block until the publisher is stopped.

Returns:



145
146
147
148
149
150
151
152
153
154
155
156
157
158
# File 'lib/google/cloud/pubsub/async_publisher.rb', line 145

def wait! timeout = nil
  synchronize do
    if @publish_thread_pool
      @publish_thread_pool.wait_for_termination timeout
    end

    if @callback_thread_pool
      @callback_thread_pool.shutdown
      @callback_thread_pool.wait_for_termination timeout
    end
  end

  self
end