Skip to content

Commit

Permalink
Merge pull request #35 from sonots/ruby
Browse files Browse the repository at this point in the history
  • Loading branch information
sonots committed Mar 14, 2016
2 parents 6e9d33e + ea1b72e commit feb6d2d
Show file tree
Hide file tree
Showing 61 changed files with 3,272 additions and 1,847 deletions.
18 changes: 6 additions & 12 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,14 +1,8 @@
target/
build/
pkg/
*.iml
*~
._*
.idea
tmp/
/pkg/
/tmp/
/.bundle/
/Gemfile.lock
vendor/
/classpath/
/.bundle
.yardoc
/embulk-*.jar
/.gradle
.ruby-version
.tags
18 changes: 18 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,21 @@
## 0.3.0 - YYYY-MM-DD

Big change is introduced. Now, embulk-output-bigquery is written in JRuby.

* [new feature] Support parallel loads. Fix [#28](https://github.com/embulk/embulk-output-bigquery/issues/28).
* [new feature] Create table first. Fix [#29](https://github.com/embulk/embulk-output-bigquery/issues/29).
* [new feature] Introduce rehearsal mode. Fix [#30](https://github.com/embulk/embulk-output-bigquery/issues/30).
* [new feature] Support `dataset_old` option for `replace_backup`. Fix [#31](https://github.com/embulk/embulk-output-bigquery/issues/31).
* [maintenance] Fix default timestamp format to `%Y-%m-%d %H:%M:%S.%6`. Fix [#32](https://github.com/embulk/embulk-output-bigquery/issues/32).
* [new feature] Support request options such as `timeout_sec`, `open_timeout_sec`, `retries`. Fix [#33](https://github.com/embulk/embulk-output-bigquery/issues/33).
* [new feature] Support continuing from file generation with `skip_file_generation` option.
* [new feature] Guess BigQuery schema from Embulk schema. Fix [#1](https://github.com/embulk/embulk-output-bigquery/issues/1).
* [new feature] Support automatically create dataset.
* [new feature] Support transactional append mode.
* [incompatibility change] Formatter plugin support is dropped. Formatter is done in this plugin for specified `source_format`.
* [incompatibility change] Encoder plugin support is dropped. Encoding is done in this plugin for specified `compression`.
* [incompatibility change] `append` mode now expresses a transactional append, and `append_direct` is one which is not transactional (this was `append` mode before)

## 0.2.3 - 2016-02-19

* [maintenance] Fix detect logic of delete_in_advance mode. [#26](https://github.com/embulk/embulk-output-bigquery/issues/26). @sonots thanks!
Expand Down
8 changes: 8 additions & 0 deletions Gemfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
source 'https://rubygems.org/'

gemspec
gem 'embulk-parser-none'
gem 'embulk-parser-jsonl'
gem 'pry-nav'
gem 'test-unit'
gem 'test-unit-rr'
20 changes: 20 additions & 0 deletions LICENSE.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
MIT License

Permission is hereby granted, free of charge, to any person obtaining
a copy of this software and associated documentation files (the
"Software"), to deal in the Software without restriction, including
without limitation the rights to use, copy, modify, merge, publish,
distribute, sublicense, and/or sell copies of the Software, and to
permit persons to whom the Software is furnished to do so, subject to
the following conditions:

The above copyright notice and this permission notice shall be
included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
204 changes: 165 additions & 39 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

# embulk-output-bigquery

[Embulk](https://github.com/embulk/embulk/) output plugin to load/insert data into [Google BigQuery](https://cloud.google.com/bigquery/) using [direct insert](https://cloud.google.com/bigquery/loading-data-into-bigquery#loaddatapostrequest)
Expand Down Expand Up @@ -26,35 +25,61 @@ OAuth flow for installed applications.

#### Original options

| name | type | required? | default | description |
|:--------------------------|:------------|:-----------|:-------------|:-----------------------|
| mode | string | optional | "append" | [See below](#mode) |
| auth_method | string | optional | "private_key" | `private_key` , `json_key` or `compute_engine`
| service_account_email | string | required when auth_method is private_key | | Your Google service account email
| p12_keyfile | string | required when auth_method is private_key | | Fullpath of private key in P12(PKCS12) format |
| json_keyfile | string | required when auth_method is json_key | | Fullpath of json key |
| sequence_format | string | optional | %03d.%02d | |
| file_ext | string | optional | | e.g. ".csv.gz" ".json.gz" |
| project | string | required | | project_id |
| dataset | string | required | | dataset |
| table | string | required | | table name |
| auto_create_table | boolean | optional | 0 | [See below](#dynamic-table-creating) |
| schema_file | string | optional | | /path/to/schema.json |
| template_table | string | optional | | existing_table_name [See below](#dynamic-table-creating) |
| prevent_duplicate_insert | boolean | optional | 0 | [See below](#data-consistency) |
| delete_from_local_when_job_end | boolean | optional | 0 | If set to true, delete local file when job is end |
| job_status_max_polling_time | int | optional | 3600 sec | Max job status polling time |
| job_status_max_polling_time | int | optional | 10 sec | Job status polling interval |
| is_skip_job_result_check | boolean | optional | 0 | |
| application_name | string | optional | "Embulk BigQuery plugin" | Anything you like |
| name | type | required? | default | description |
|:-------------------------------------|:------------|:-----------|:-------------------------|:-----------------------|
| mode | string | optional | "append" | [See below](#mode) |
| auth_method | string | optional | "private_key" | `private_key` , `json_key` or `compute_engine`
| service_account_email | string | required when auth_method is private_key | | Your Google service account email
| p12_keyfile | string | required when auth_method is private_key | | Fullpath of private key in P12(PKCS12) format |
| json_keyfile | string | required when auth_method is json_key | | Fullpath of json key |
| project | string | required if json_keyfile is not given | | project_id |
| dataset | string | required | | dataset |
| table | string | required | | table name |
| auto_create_dataset | boolean | optional | false | automatically create dataset |
| auto_create_table | boolean | optional | false | [See below](#dynamic-table-creating) |
| schema_file | string | optional | | /path/to/schema.json |
| template_table | string | optional | | template table name [See below](#dynamic-table-creating) |
| prevent_duplicate_insert | boolean | optional | false | [See below](#data-consistency) |
| job_status_max_polling_time | int | optional | 3600 sec | Max job status polling time |
| job_status_polling_interval | int | optional | 10 sec | Job status polling interval |
| is_skip_job_result_check | boolean | optional | false | Skip waiting Load job finishes. Available for append, or delete_in_advance mode |
| with_rehearsal | boolean | optional | false | Load `rehearsal_counts` records as a rehearsal. Rehearsal loads into REHEARSAL temporary table, and delete finally. You may use this option to investigate data errors as early stage as possible |
| rehearsal_counts | integer | optional | 1000 | Specify number of records to load in a rehearsal |
| column_options | hash | optional | | [See below](#column-options) |
| default_timezone | string | optional | UTC | |
| default_timestamp_format | string | optional | %Y-%m-%d %H:%M:%S.%6N | |
| payload_column | string | optional | nil | [See below](#formatter-performance-issue) |
| payload_column_index | integer | optional | nil | [See below](#formatter-performance-issue) |

Client or request options

| name | type | required? | default | description |
|:-------------------------------------|:------------|:-----------|:-------------------------|:-----------------------|
| timeout_sec | integer | optional | 300 | Seconds to wait for one block to be read |
| open_timeout_sec | integer | optional | 300 | Seconds to wait for the connection to open |
| retries | integer | optional | 5 | Number of retries |
| application_name | string | optional | "Embulk BigQuery plugin" | User-Agent |

Options for intermediate local files

| name | type | required? | default | description |
|:-------------------------------------|:------------|:-----------|:-------------------------|:-----------------------|
| path_prefix | string | optional | | Path prefix of local files such as "/tmp/prefix_". Default randomly generates with [tempfile](http://ruby-doc.org/stdlib-2.2.3/libdoc/tempfile/rdoc/Tempfile.html) |
| sequence_format | string | optional | .%d.%03d | Sequence format for pid, task index |
| file_ext | string | optional | | The file extension of local files such as ".csv.gz" ".json.gz". Default automatically generates from `source_format` and `compression`|
| skip_file_generation | boolean | optional | | Load already generated local files into BigQuery if available. Specify correct path_prefix and file_ext. |
| delete_from_local_when_job_end | boolean | optional | false | If set to true, delete glocal file when job is end |
| compression | string | optional | "NONE" | Compression of local files (`GZIP` or `NONE`) |

`source_format` is also used to determine formatter (csv or jsonl).

#### Same options of bq command-line tools or BigQuery job's propery

Following options are same as [bq command-line tools](https://cloud.google.com/bigquery/bq-command-line-tool#creatingtablefromfile) or BigQuery [job's property](https://cloud.google.com/bigquery/docs/reference/v2/jobs#resource).

| name | type | required? | default | description |
|:--------------------------|:------------|:-----------|:-------------|:-----------------------|
| source_format | string | required | "CSV" | File type (`NEWLINE_DELIMITED_JSON` or `CSV`) |
| source_format | string | required | "CSV" | File type (`NEWLINE_DELIMITED_JSON` or `CSV`) |
| max_bad_records | int | optional | 0 | |
| field_delimiter | char | optional | "," | |
| encoding | string | optional | "UTF-8" | `UTF-8` or `ISO-8859-1` |
Expand All @@ -70,26 +95,26 @@ out:
auth_method: private_key # default
service_account_email: ABCXYZ123ABCXYZ123.gserviceaccount.com
p12_keyfile: /path/to/p12_keyfile.p12
path_prefix: /path/to/output
file_ext: csv.gz
source_format: CSV
project: your-project-000
dataset: your_dataset_name
table: your_table_name
formatter:
type: csv
header_line: false
encoders:
- {type: gzip}
compression: GZIP
source_format: NEWLINE_DELIMITED_JSON
```
### mode
4 modes are provided.
5 modes are provided.
##### append
default. When append mode, plugin will insert data into existing table.
1. Load to temporary table.
2. Copy temporary table to destination table. (WRITE_APPEND)
##### append_direct
Insert data into existing table directly.
This is not transactional, i.e., if fails, the target table could have some rows inserted.
##### replace
Expand All @@ -101,7 +126,7 @@ default. When append mode, plugin will insert data into existing table.
##### replace_backup

1. Load to temporary table.
2. Copy destination table to backup table. (table_name_old)
2. Copy destination table to backup table. (dataset_old, table_old)
3. Copy temporary table to destination table. (WRITE_TRUNCATE)

```is_skip_job_result_check``` must be false when replace_backup mode.
Expand All @@ -111,8 +136,6 @@ default. When append mode, plugin will insert data into existing table.
1. Delete destination table, if it exists.
2. Load to destination table.

```auto_create_table``` must be true when delete_in_advance mode.

### Authentication

There are three methods supported to fetch access token for the service account.
Expand Down Expand Up @@ -196,7 +219,7 @@ When `auto_create_table` is set to true, try to create the table using BigQuery

If table already exists, insert into it.

There are 2 ways to set schema.
There are 3 ways to set schema.

#### Set schema.json

Expand All @@ -222,6 +245,78 @@ out:
template_table: existing_table_name
```

#### Guess from Embulk Schema

Plugin will try to guess BigQuery schema from Embulk schema. It is also configurable with `column_options`. See [Column Options](#column-options).

### Column Options

Column options are used to aid guessing BigQuery schema, or to define conversion of values:

- **column_options**: advanced: an array of options for columns
- **name**: column name
- **type**: BigQuery type such as `BOOLEAN`, `INTEGER`, `FLOAT`, `STRING`, `TIMESTAMP`, and `RECORD`. See belows for supported conversion type.
- boolean: `BOOLEAN`, `STRING` (default: `BOOLEAN`)
- long: `BOOLEAN`, `INTEGER`, `FLOAT`, `STRING`, `TIMESTAMP` (default: `INTEGER`)
- double: `INTEGER`, `FLOAT`, `STRING`, `TIMESTAMP` (default: `FLOAT`)
- string: `BOOLEAN`, `INTEGER`, `FLOAT`, `STRING`, `TIMESTAMP`, `RECORD` (default: `STRING`)
- timestamp: `INTEGER`, `FLOAT`, `STRING`, `TIMESTAMP` (default: `TIMESTAMP`)
- json: `STRING`, `RECORD` (default: `STRING`)
- **mode**: BigQuery mode such as `NULLABLE`, `REQUIRED`, and `REPEATED` (string, default: `NULLABLE`)
- **fields**: Describes the nested schema fields if the type property is set to RECORD. Please note that this is **required** for `RECORD` column.
- **timestamp_format**: timestamp format to convert into/from `timestamp` (string, default is `default_timestamp_format`)
- **timezone**: timezone to convert into/from `timestamp` (string, default is `default_timezone`).
- **default_timestamp_format**: default timestamp format for column_options (string, default is "%Y-%m-%d %H:%M:%S.%6N")
- **default_timezone**: default timezone for column_options (string, default is "UTC")

Example)

```yaml
out:
type: bigquery
auto_create_table: true
column_options:
- {name: date, type: STRING, timestamp_format: %Y-%m-%d, timezone: "Asia/Tokyo"}
- name: json_column
type: RECORD
fields:
- {name: key1, type: STRING}
- {name: key2, type: STRING}
```

NOTE: Type conversion is done in this jruby plugin, and could be slow. See [Formatter Performance Issue](#formatter-performance-issue) to improve the performance.

### Formatter Performance Issue

embulk-output-bigquery supports formatting records into CSV or JSON (and also formatting timestamp column).
However, this plugin is written in jruby, and jruby plugins are slower than java plugins generally.

Therefore, it is recommended to format records with filter plugins written in Java such as [embulk-filter-to_json](https://github.com/civitaspo/embulk-filter-to_json) as:

```
filters:
- type: to_json
column: {name: payload, type: string}
default_format: %Y-%m-%d %H:%M:%S.%6N
out:
type: bigquery
payload_column_index: 0 # or, payload_column: payload
```

Furtheremore, if your files are originally jsonl or csv files, you can even skip a parser with [embulk-parser-none](https://github.com/sonots/embulk-parser-none) as:

```
in:
type: file
path_prefix: example/example.jsonl
parser:
type: none
column_name: payload
out:
type: bigquery
payload_column_index: 0 # or, payload_column: payload
```

### Data Consistency

When `prevent_duplicate_insert` is set to true, embulk-output-bigquery generate job ID from md5 hash of file and other options to prevent duplicate data insertion.
Expand All @@ -238,8 +333,39 @@ out:
prevent_duplicate_insert: true
```

## Build
## Development

### Run example:

Prepare a json\_keyfile at /tmp/your-project-000.json, then

```
$ ./gradlew gem
$ embulk bundle install --path vendor/bundle
$ embulk run -X page_size=1 -b . -l trace example/example.yml
```
### Run test:
```
$ bundle exec rake test
```
To run tests which actually connects to BigQuery such as test/test\_bigquery\_client.rb,
prepare a json\_keyfile at /tmp/your-project-000.json, then
```
$ CONNECT=1 bundle exec ruby test/test_bigquery_client.rb
$ CONNECT=1 bundle exec ruby test/test_example.rb
```
### Release gem:
Fix gemspec, then
```
$ bundle exec rake release
```
## ChangeLog
[CHANGELOG.md](CHANGELOG.md)
11 changes: 11 additions & 0 deletions Rakefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
require "bundler/gem_tasks"
require 'rake/testtask'

desc 'Run test_unit based test'
Rake::TestTask.new(:test) do |t|
t.libs << "test"
t.test_files = Dir["test/**/test_*.rb"].sort
t.verbose = true
#t.warning = true
end
task :default => :test
Loading

0 comments on commit feb6d2d

Please sign in to comment.