Class: Google::Cloud::Bigquery::Table::AsyncInserter

Inherits:
Object
  • Object
show all
Includes:
MonitorMixin
Defined in:
lib/google/cloud/bigquery/table/async_inserter.rb

Overview

AsyncInserter

Used to insert multiple rows in batches to a topic. See #insert_async.

Examples:

require "google/cloud/bigquery"

bigquery = Google::Cloud::Bigquery.new
dataset = bigquery.dataset "my_dataset"
table = dataset.table "my_table"
inserter = table.insert_async do |result|
  if result.error?
    log_error result.error
  else
    log_insert "inserted #{result.insert_count} rows " \
      "with #{result.error_count} errors"
  end
end

rows = [
  { "first_name" => "Alice", "age" => 21 },
  { "first_name" => "Bob", "age" => 22 }
]
inserter.insert rows

inserter.stop.wait!

Defined Under Namespace

Classes: Result

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#intervalNumeric (readonly)

The number of seconds to collect rows before the batch is inserted. Default is 10.

Returns:

  • (Numeric)

    the current value of interval



63
64
65
# File 'lib/google/cloud/bigquery/table/async_inserter.rb', line 63

def interval
  @interval
end

#max_bytesInteger (readonly)

The maximum size of rows to be collected before the batch is inserted. Default is 10,000,000 (10MB).

Returns:

  • (Integer)

    the current value of max_bytes



63
64
65
# File 'lib/google/cloud/bigquery/table/async_inserter.rb', line 63

def max_bytes
  @max_bytes
end

#max_rowsInteger (readonly)

The maximum number of rows to be collected before the batch is inserted. Default is 500.

Returns:

  • (Integer)

    the current value of max_rows



63
64
65
# File 'lib/google/cloud/bigquery/table/async_inserter.rb', line 63

def max_rows
  @max_rows
end

#threadsInteger (readonly)

The number of threads used to insert rows. Default is 4.

Returns:

  • (Integer)

    the current value of threads



63
64
65
# File 'lib/google/cloud/bigquery/table/async_inserter.rb', line 63

def threads
  @threads
end

Instance Method Details

#flushAsyncInserter

Forces all rows in the current batch to be inserted immediately.

Returns:



178
179
180
181
182
183
184
185
# File 'lib/google/cloud/bigquery/table/async_inserter.rb', line 178

def flush
  synchronize do
    push_batch_request!
    @cond.signal
  end

  self
end

#insert(rows) ⇒ Object

Adds rows to the async inserter to be inserted. Rows will be collected in batches and inserted together. See Google::Cloud::Bigquery::Table#insert_async.

Parameters:

  • rows (Hash, Array<Hash>)

    A hash object or array of hash objects containing the data.



104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# File 'lib/google/cloud/bigquery/table/async_inserter.rb', line 104

def insert rows
  return nil if rows.nil?
  return nil if rows.is_a?(Array) && rows.empty?
  rows = [rows] if rows.is_a? Hash

  synchronize do
    rows.each do |row|
      if @batch.nil?
        @batch = Batch.new max_bytes: @max_bytes, max_rows: @max_rows
        @batch.insert row
      else
        unless @batch.try_insert row
          push_batch_request!

          @batch = Batch.new max_bytes: @max_bytes,
                             max_rows: @max_rows
          @batch.insert row
        end
      end

      @batch_created_at ||= ::Time.now
      @background_thread ||= Thread.new { run_background }

      push_batch_request! if @batch.ready?
    end

    @cond.signal
  end

  true
end

#started?boolean

Whether the inserter has been started.

Returns:

  • (boolean)

    true when started, false otherwise.



192
193
194
# File 'lib/google/cloud/bigquery/table/async_inserter.rb', line 192

def started?
  !stopped?
end

#stopAsyncInserter

Begins the process of stopping the inserter. Rows already in the queue will be inserted, but no new rows can be added. Use #wait! to block until the inserter is fully stopped and all pending rows have been inserted.

Returns:



144
145
146
147
148
149
150
151
152
153
154
# File 'lib/google/cloud/bigquery/table/async_inserter.rb', line 144

def stop
  synchronize do
    break if @stopped

    @stopped = true
    push_batch_request!
    @cond.signal
  end

  self
end

#stopped?boolean

Whether the inserter has been stopped.

Returns:

  • (boolean)

    true when stopped, false otherwise.



201
202
203
# File 'lib/google/cloud/bigquery/table/async_inserter.rb', line 201

def stopped?
  synchronize { @stopped }
end

#wait!(timeout = nil) ⇒ AsyncInserter

Blocks until the inserter is fully stopped, all pending rows have been inserted, and all callbacks have completed. Does not stop the inserter. To stop the inserter, first call #stop and then call #wait! to block until the inserter is stopped.

Returns:



164
165
166
167
168
169
170
171
# File 'lib/google/cloud/bigquery/table/async_inserter.rb', line 164

def wait! timeout = nil
  synchronize do
    @thread_pool.shutdown
    @thread_pool.wait_for_termination timeout
  end

  self
end