Class: Google::Cloud::Pubsub::AsyncPublisher
- Inherits:
-
Object
- Object
- Google::Cloud::Pubsub::AsyncPublisher
- Includes:
- MonitorMixin
- Defined in:
- lib/google/cloud/pubsub/async_publisher.rb
Overview
Used to publish multiple messages in batches to a topic. See Topic#async_publisher
Instance Attribute Summary collapse
-
#callback_threads ⇒ Numeric
readonly
The number of threads to handle the published messages' callbacks.
-
#interval ⇒ Numeric
readonly
The number of seconds to collect messages before the batch is published.
-
#max_bytes ⇒ Integer
readonly
The maximum size of messages to be collected before the batch is published.
-
#max_messages ⇒ Integer
readonly
The maximum number of messages to be collected before the batch is published.
-
#publish_threads ⇒ Numeric
readonly
The number of threads used to publish messages.
-
#topic_name ⇒ String
readonly
The name of the topic the messages are published to.
Instance Method Summary collapse
-
#flush ⇒ AsyncPublisher
Forces all messages in the current batch to be published immediately.
-
#publish(data = nil, attributes = {}, &block) ⇒ Object
Add a message to the async publisher to be published to the topic.
-
#started? ⇒ boolean
Whether the publisher has been started.
-
#stop ⇒ AsyncPublisher
Begins the process of stopping the publisher.
-
#stopped? ⇒ boolean
Whether the publisher has been stopped.
-
#wait!(timeout = nil) ⇒ AsyncPublisher
Blocks until the publisher is fully stopped, all pending messages have been published, and all callbacks have completed.
Instance Attribute Details
#callback_threads ⇒ Numeric (readonly)
The number of threads to handle the published messages' callbacks. Default is 8.
59 60 61 |
# File 'lib/google/cloud/pubsub/async_publisher.rb', line 59 def callback_threads @callback_threads end |
#interval ⇒ Numeric (readonly)
The number of seconds to collect messages before the batch is published. Default is 0.25.
59 60 61 |
# File 'lib/google/cloud/pubsub/async_publisher.rb', line 59 def interval @interval end |
#max_bytes ⇒ Integer (readonly)
The maximum size of messages to be collected before the batch is published. Default is 10,000,000 (10MB).
59 60 61 |
# File 'lib/google/cloud/pubsub/async_publisher.rb', line 59 def max_bytes @max_bytes end |
#max_messages ⇒ Integer (readonly)
The maximum number of messages to be collected before the batch is published. Default is 1,000.
59 60 61 |
# File 'lib/google/cloud/pubsub/async_publisher.rb', line 59 def @max_messages end |
#publish_threads ⇒ Numeric (readonly)
The number of threads used to publish messages. Default is 4.
59 60 61 |
# File 'lib/google/cloud/pubsub/async_publisher.rb', line 59 def publish_threads @publish_threads end |
#topic_name ⇒ String (readonly)
The name of the topic the messages are published to. In the form of "/projects/project-identifier/topics/topic-name".
59 60 61 |
# File 'lib/google/cloud/pubsub/async_publisher.rb', line 59 def topic_name @topic_name end |
Instance Method Details
#flush ⇒ AsyncPublisher
Forces all messages in the current batch to be published immediately.
165 166 167 168 169 170 171 172 |
# File 'lib/google/cloud/pubsub/async_publisher.rb', line 165 def flush synchronize do publish_batch! @cond.signal end self end |
#publish(data = nil, attributes = {}, &block) ⇒ Object
Add a message to the async publisher to be published to the topic. Messages will be collected in batches and published together. See Topic#publish_async
92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 |
# File 'lib/google/cloud/pubsub/async_publisher.rb', line 92 def publish data = nil, attributes = {}, &block msg = data, attributes synchronize do raise "Can't publish when stopped." if @stopped if @batch.nil? @batch ||= Batch.new self @batch.add msg, block else unless @batch.try_add msg, block publish_batch! @batch = Batch.new self @batch.add msg, block end end init_resources! publish_batch! if @batch.ready? @cond.signal end nil end |
#started? ⇒ boolean
Whether the publisher has been started.
178 179 180 |
# File 'lib/google/cloud/pubsub/async_publisher.rb', line 178 def started? !stopped? end |
#stop ⇒ AsyncPublisher
Begins the process of stopping the publisher. Messages already in the queue will be published, but no new messages can be added. Use #wait! to block until the publisher is fully stopped and all pending messages have been published.
125 126 127 128 129 130 131 132 133 134 135 136 |
# File 'lib/google/cloud/pubsub/async_publisher.rb', line 125 def stop synchronize do break if @stopped @stopped = true publish_batch! @cond.signal @publish_thread_pool.shutdown if @publish_thread_pool end self end |
#stopped? ⇒ boolean
Whether the publisher has been stopped.
186 187 188 |
# File 'lib/google/cloud/pubsub/async_publisher.rb', line 186 def stopped? synchronize { @stopped } end |
#wait!(timeout = nil) ⇒ AsyncPublisher
145 146 147 148 149 150 151 152 153 154 155 156 157 158 |
# File 'lib/google/cloud/pubsub/async_publisher.rb', line 145 def wait! timeout = nil synchronize do if @publish_thread_pool @publish_thread_pool.wait_for_termination timeout end if @callback_thread_pool @callback_thread_pool.shutdown @callback_thread_pool.wait_for_termination timeout end end self end |