Class: Google::Cloud::Bigquery::Table::AsyncInserter
- Inherits:
-
Object
- Object
- Google::Cloud::Bigquery::Table::AsyncInserter
- Includes:
- MonitorMixin
- Defined in:
- lib/google/cloud/bigquery/table/async_inserter.rb
Overview
AsyncInserter
Used to insert multiple rows in batches to a topic. See #insert_async.
Defined Under Namespace
Classes: Result
Instance Attribute Summary collapse
-
#interval ⇒ Numeric
readonly
The number of seconds to collect rows before the batch is inserted.
-
#max_bytes ⇒ Integer
readonly
The maximum size of rows to be collected before the batch is inserted.
-
#max_rows ⇒ Integer
readonly
The maximum number of rows to be collected before the batch is inserted.
-
#threads ⇒ Integer
readonly
The number of threads used to insert rows.
Instance Method Summary collapse
-
#flush ⇒ AsyncInserter
Forces all rows in the current batch to be inserted immediately.
-
#insert(rows, insert_ids: nil) ⇒ Object
Adds rows to the async inserter to be inserted.
-
#started? ⇒ boolean
Whether the inserter has been started.
-
#stop ⇒ AsyncInserter
Begins the process of stopping the inserter.
-
#stopped? ⇒ boolean
Whether the inserter has been stopped.
-
#wait!(timeout = nil) ⇒ AsyncInserter
Blocks until the inserter is fully stopped, all pending rows have been inserted, and all callbacks have completed.
Instance Attribute Details
#interval ⇒ Numeric (readonly)
The number of seconds to collect rows before the batch is inserted. Default is 10.
64 65 66 |
# File 'lib/google/cloud/bigquery/table/async_inserter.rb', line 64 def interval @interval end |
#max_bytes ⇒ Integer (readonly)
The maximum size of rows to be collected before the batch is inserted. Default is 10,000,000 (10MB).
64 65 66 |
# File 'lib/google/cloud/bigquery/table/async_inserter.rb', line 64 def max_bytes @max_bytes end |
#max_rows ⇒ Integer (readonly)
The maximum number of rows to be collected before the batch is inserted. Default is 500.
64 65 66 |
# File 'lib/google/cloud/bigquery/table/async_inserter.rb', line 64 def max_rows @max_rows end |
#threads ⇒ Integer (readonly)
The number of threads used to insert rows. Default is 4.
64 65 66 |
# File 'lib/google/cloud/bigquery/table/async_inserter.rb', line 64 def threads @threads end |
Instance Method Details
#flush ⇒ AsyncInserter
Forces all rows in the current batch to be inserted immediately.
185 186 187 188 189 190 191 192 |
# File 'lib/google/cloud/bigquery/table/async_inserter.rb', line 185 def flush synchronize do push_batch_request! @cond.signal end self end |
#insert(rows, insert_ids: nil) ⇒ Object
Adds rows to the async inserter to be inserted. Rows will be collected in batches and inserted together. See Google::Cloud::Bigquery::Table#insert_async.
111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 |
# File 'lib/google/cloud/bigquery/table/async_inserter.rb', line 111 def insert rows, insert_ids: nil return nil if rows.nil? return nil if rows.is_a?(Array) && rows.empty? rows, insert_ids = validate_insert_args rows, insert_ids synchronize do rows.zip(Array(insert_ids)).each do |row, insert_id| if @batch.nil? @batch = Batch.new max_bytes: @max_bytes, max_rows: @max_rows @batch.insert row, insert_id else unless @batch.try_insert row, insert_id push_batch_request! @batch = Batch.new max_bytes: @max_bytes, max_rows: @max_rows @batch.insert row, insert_id end end @batch_created_at ||= ::Time.now @background_thread ||= Thread.new { run_background } push_batch_request! if @batch.ready? end @cond.signal end true end |
#started? ⇒ boolean
Whether the inserter has been started.
199 200 201 |
# File 'lib/google/cloud/bigquery/table/async_inserter.rb', line 199 def started? !stopped? end |
#stop ⇒ AsyncInserter
Begins the process of stopping the inserter. Rows already in the queue will be inserted, but no new rows can be added. Use #wait! to block until the inserter is fully stopped and all pending rows have been inserted.
151 152 153 154 155 156 157 158 159 160 161 |
# File 'lib/google/cloud/bigquery/table/async_inserter.rb', line 151 def stop synchronize do break if @stopped @stopped = true push_batch_request! @cond.signal end self end |
#stopped? ⇒ boolean
Whether the inserter has been stopped.
208 209 210 |
# File 'lib/google/cloud/bigquery/table/async_inserter.rb', line 208 def stopped? synchronize { @stopped } end |
#wait!(timeout = nil) ⇒ AsyncInserter
171 172 173 174 175 176 177 178 |
# File 'lib/google/cloud/bigquery/table/async_inserter.rb', line 171 def wait! timeout = nil synchronize do @thread_pool.shutdown @thread_pool.wait_for_termination timeout end self end |