Class: Google::Cloud::Bigquery::Table::AsyncInserter
- Inherits:
-
Object
- Object
- Google::Cloud::Bigquery::Table::AsyncInserter
- Includes:
- MonitorMixin
- Defined in:
- lib/google/cloud/bigquery/table/async_inserter.rb
Overview
AsyncInserter
Used to insert multiple rows in batches to a topic. See #insert_async.
Defined Under Namespace
Classes: Result
Instance Attribute Summary collapse
-
#interval ⇒ Numeric
readonly
The number of seconds to collect rows before the batch is inserted.
-
#max_bytes ⇒ Integer
readonly
The maximum size of rows to be collected before the batch is inserted.
-
#max_rows ⇒ Integer
readonly
The maximum number of rows to be collected before the batch is inserted.
-
#threads ⇒ Integer
readonly
The number of threads used to insert rows.
Instance Method Summary collapse
-
#flush ⇒ AsyncInserter
Forces all rows in the current batch to be inserted immediately.
-
#insert(rows) ⇒ Object
Adds rows to the async inserter to be inserted.
-
#started? ⇒ boolean
Whether the inserter has been started.
-
#stop ⇒ AsyncInserter
Begins the process of stopping the inserter.
-
#stopped? ⇒ boolean
Whether the inserter has been stopped.
-
#wait!(timeout = nil) ⇒ AsyncInserter
Blocks until the inserter is fully stopped, all pending rows have been inserted, and all callbacks have completed.
Instance Attribute Details
#interval ⇒ Numeric (readonly)
The number of seconds to collect rows before the batch is inserted. Default is 10.
63 64 65 |
# File 'lib/google/cloud/bigquery/table/async_inserter.rb', line 63 def interval @interval end |
#max_bytes ⇒ Integer (readonly)
The maximum size of rows to be collected before the batch is inserted. Default is 10,000,000 (10MB).
63 64 65 |
# File 'lib/google/cloud/bigquery/table/async_inserter.rb', line 63 def max_bytes @max_bytes end |
#max_rows ⇒ Integer (readonly)
The maximum number of rows to be collected before the batch is inserted. Default is 500.
63 64 65 |
# File 'lib/google/cloud/bigquery/table/async_inserter.rb', line 63 def max_rows @max_rows end |
#threads ⇒ Integer (readonly)
The number of threads used to insert rows. Default is 4.
63 64 65 |
# File 'lib/google/cloud/bigquery/table/async_inserter.rb', line 63 def threads @threads end |
Instance Method Details
#flush ⇒ AsyncInserter
Forces all rows in the current batch to be inserted immediately.
178 179 180 181 182 183 184 185 |
# File 'lib/google/cloud/bigquery/table/async_inserter.rb', line 178 def flush synchronize do push_batch_request! @cond.signal end self end |
#insert(rows) ⇒ Object
Adds rows to the async inserter to be inserted. Rows will be collected in batches and inserted together. See Google::Cloud::Bigquery::Table#insert_async.
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 |
# File 'lib/google/cloud/bigquery/table/async_inserter.rb', line 104 def insert rows return nil if rows.nil? return nil if rows.is_a?(Array) && rows.empty? rows = [rows] if rows.is_a? Hash synchronize do rows.each do |row| if @batch.nil? @batch = Batch.new max_bytes: @max_bytes, max_rows: @max_rows @batch.insert row else unless @batch.try_insert row push_batch_request! @batch = Batch.new max_bytes: @max_bytes, max_rows: @max_rows @batch.insert row end end @batch_created_at ||= ::Time.now @background_thread ||= Thread.new { run_background } push_batch_request! if @batch.ready? end @cond.signal end true end |
#started? ⇒ boolean
Whether the inserter has been started.
192 193 194 |
# File 'lib/google/cloud/bigquery/table/async_inserter.rb', line 192 def started? !stopped? end |
#stop ⇒ AsyncInserter
Begins the process of stopping the inserter. Rows already in the queue will be inserted, but no new rows can be added. Use #wait! to block until the inserter is fully stopped and all pending rows have been inserted.
144 145 146 147 148 149 150 151 152 153 154 |
# File 'lib/google/cloud/bigquery/table/async_inserter.rb', line 144 def stop synchronize do break if @stopped @stopped = true push_batch_request! @cond.signal end self end |
#stopped? ⇒ boolean
Whether the inserter has been stopped.
201 202 203 |
# File 'lib/google/cloud/bigquery/table/async_inserter.rb', line 201 def stopped? synchronize { @stopped } end |
#wait!(timeout = nil) ⇒ AsyncInserter
164 165 166 167 168 169 170 171 |
# File 'lib/google/cloud/bigquery/table/async_inserter.rb', line 164 def wait! timeout = nil synchronize do @thread_pool.shutdown @thread_pool.wait_for_termination timeout end self end |