Class: Google::Cloud::Bigquery::Table::AsyncInserter
- Inherits:
-
Object
- Object
- Google::Cloud::Bigquery::Table::AsyncInserter
- Includes:
- MonitorMixin
- Defined in:
- lib/google/cloud/bigquery/table/async_inserter.rb
Overview
AsyncInserter
Used to insert multiple rows in batches to a topic. See #insert_async.
Instance Attribute Summary collapse
-
#interval ⇒ Numeric
readonly
The number of seconds to collect rows before the batch is inserted.
-
#max_bytes ⇒ Integer
readonly
The maximum size of rows to be collected before the batch is inserted.
-
#max_rows ⇒ Integer
readonly
The maximum number of rows to be collected before the batch is inserted.
-
#threads ⇒ Integer
readonly
The number of threads used to insert rows.
Instance Method Summary collapse
-
#flush ⇒ AsyncInserter
Forces all rows in the current batch to be inserted immediately.
-
#insert(rows) ⇒ Object
Adds rows to the async inserter to be inserted.
-
#started? ⇒ boolean
Whether the inserter has been started.
-
#stop ⇒ AsyncInserter
Begins the process of stopping the inserter.
-
#stopped? ⇒ boolean
Whether the inserter has been stopped.
-
#wait!(timeout = nil) ⇒ AsyncInserter
Blocks until the inserter is fully stopped, all pending rows have been inserted, and all callbacks have completed.
Instance Attribute Details
#interval ⇒ Numeric (readonly)
The number of seconds to collect rows before the batch is inserted. Default is 10.
59 60 61 |
# File 'lib/google/cloud/bigquery/table/async_inserter.rb', line 59 def interval @interval end |
#max_bytes ⇒ Integer (readonly)
The maximum size of rows to be collected before the batch is inserted. Default is 10,000,000 (10MB).
59 60 61 |
# File 'lib/google/cloud/bigquery/table/async_inserter.rb', line 59 def max_bytes @max_bytes end |
#max_rows ⇒ Integer (readonly)
The maximum number of rows to be collected before the batch is inserted. Default is 500.
59 60 61 |
# File 'lib/google/cloud/bigquery/table/async_inserter.rb', line 59 def max_rows @max_rows end |
#threads ⇒ Integer (readonly)
The number of threads used to insert rows. Default is 4.
59 60 61 |
# File 'lib/google/cloud/bigquery/table/async_inserter.rb', line 59 def threads @threads end |
Instance Method Details
#flush ⇒ AsyncInserter
Forces all rows in the current batch to be inserted immediately.
174 175 176 177 178 179 180 181 |
# File 'lib/google/cloud/bigquery/table/async_inserter.rb', line 174 def flush synchronize do push_batch_request! @cond.signal end self end |
#insert(rows) ⇒ Object
Adds rows to the async inserter to be inserted. Rows will be collected in batches and inserted together. See Google::Cloud::Bigquery::Table#insert_async.
100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 |
# File 'lib/google/cloud/bigquery/table/async_inserter.rb', line 100 def insert rows return nil if rows.nil? return nil if rows.is_a?(Array) && rows.empty? rows = [rows] if rows.is_a? Hash synchronize do rows.each do |row| if @batch.nil? @batch = Batch.new max_bytes: @max_bytes, max_rows: @max_rows @batch.insert row else unless @batch.try_insert row push_batch_request! @batch = Batch.new max_bytes: @max_bytes, max_rows: @max_rows @batch.insert row end end @batch_created_at ||= ::Time.now @background_thread ||= Thread.new { run_background } push_batch_request! if @batch.ready? end @cond.signal end true end |
#started? ⇒ boolean
Whether the inserter has been started.
188 189 190 |
# File 'lib/google/cloud/bigquery/table/async_inserter.rb', line 188 def started? !stopped? end |
#stop ⇒ AsyncInserter
Begins the process of stopping the inserter. Rows already in the queue will be inserted, but no new rows can be added. Use #wait! to block until the inserter is fully stopped and all pending rows have been inserted.
140 141 142 143 144 145 146 147 148 149 150 |
# File 'lib/google/cloud/bigquery/table/async_inserter.rb', line 140 def stop synchronize do break if @stopped @stopped = true push_batch_request! @cond.signal end self end |
#stopped? ⇒ boolean
Whether the inserter has been stopped.
197 198 199 |
# File 'lib/google/cloud/bigquery/table/async_inserter.rb', line 197 def stopped? synchronize { @stopped } end |
#wait!(timeout = nil) ⇒ AsyncInserter
160 161 162 163 164 165 166 167 |
# File 'lib/google/cloud/bigquery/table/async_inserter.rb', line 160 def wait! timeout = nil synchronize do @thread_pool.shutdown @thread_pool.wait_for_termination timeout end self end |