Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CDPD-12081 Upgrade gcs connector to v1.9.17 from upstream in HDP 3.1-maint #8

Open
wants to merge 69 commits into
base: HDP-3.1-maint
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
69 commits
Select commit Hold shift + click to select a range
1447c6c
Update versions for next connectors release development.
medb Nov 1, 2018
7babc9b
Use assertThrows where possible #cleanup
medb Nov 5, 2018
3608d4c
Use 'uri-path' as the default path codec.
medb Nov 6, 2018
2f580f8
Fix GCSIO integration test.
medb Dec 6, 2018
078cc17
Update GCS connector dependencies to latest versions.
medb Dec 6, 2018
cb07768
Parallelize Maven builds
medb Dec 10, 2018
e522221
Prefetch metadata with only 1 list request.
medb Dec 11, 2018
8978a99
Support lazy initialization of GHFS
medb Dec 13, 2018
eb8e5a9
Make it possible to set `fs.gs.system.bucket` to an empty string (#139)
jphalip Dec 15, 2018
5068483
Remove redundant log message before thrown exception in ResilientOper…
medb Dec 14, 2018
0bfa68a
Clean up.
medb Dec 17, 2018
15616bf
Set default for "fs.gs.working.dir" to "/" (#140)
jphalip Dec 20, 2018
c9088d9
Minor clean up
medb Dec 18, 2018
2392540
LSC: Remove or broaden the visibility of @VisibleForTesting annotatio…
graememorgan Dec 20, 2018
fc5b370
Fix compilation failures
medb Dec 20, 2018
9d77d19
Release GCS connector 1.9.11 and BQ connector 0.13.11.
Dec 20, 2018
03a23eb
Update versions for next connectors release development
Dec 20, 2018
f7c90df
Add Maven Central badges
medb Dec 20, 2018
8256670
Improve links in Maven Central badges
medb Dec 21, 2018
c2df8f3
Add LGTM badges
medb Dec 21, 2018
7b52cf8
Add more badges
medb Dec 23, 2018
0cb3521
Remove redundant license badge
medb Dec 24, 2018
9e572ec
Remove redundant lgtm.com badge
medb Dec 25, 2018
dafb138
Fix GCS connector documentation.
medb Dec 28, 2018
ab9b5ea
Improve exception message
medb Jan 3, 2019
55528bc
Fix partial read if exception is thrown on last retry.
medb Jan 3, 2019
6ef6121
Add info logging for failed delete and rename operations.
medb Jan 16, 2019
0e30d8a
Add kmsKeyName to GoogleCloudStorageWriteChannel (#146)
udim Jan 18, 2019
6600463
Update connectors dependencies.
medb Jan 25, 2019
5a9ed1c
Improve exception message.
medb Jan 26, 2019
7a7f06e
Minor fixes to read logic.
medb Jan 26, 2019
7713969
#cleanup test failure message formatting
medb Jan 26, 2019
75ce5ee
Fix for error-prone UnnecessaryParentheses warning
medb Jan 28, 2019
0a6a927
Roll-back Apache HTTP Client version to fix integration tests.
medb Jan 28, 2019
fcc8ede
Improve GCS IO exception messages.
medb Jan 28, 2019
1942282
Initial set for location restriction. Since the regions do not exist …
DanSedov Jan 29, 2019
6b28380
Improve `testMultipleDeleteBucket` test case.
medb Jan 29, 2019
a1673fd
Parallelize get and list GCS requests and remove redundant GCS reques…
medb Jan 29, 2019
a711b8f
Fix bug that could lead to data duplication when reading files with G…
medb Jan 30, 2019
8d17fe9
Release GCS connector 1.9.12 and BQ connector 0.13.12.
medb Jan 30, 2019
adf9311
Update versions for next connectors release development
medb Jan 30, 2019
52f5055
Fix directory inference.
medb Feb 4, 2019
142c51b
Release GCS connector 1.9.13 and BQ connector 0.13.13.
medb Feb 4, 2019
e564678
Update versions for next connectors release development
medb Feb 4, 2019
dcae140
Add Stack Overflow as a Q&A resource for connector questions.
aman-ebay Feb 8, 2019
d3f61ab
Clean up Markdown formatting [skip ci]
mbrukman Feb 8, 2019
f33f594
Improve README files
medb Feb 8, 2019
5cabe18
Fix usages of any(<Primitive>.class) matchers
TimvdLippe Feb 4, 2019
d9ea4b8
Implement Hadoop File Sytem concat method (#147)
tomwhite Feb 11, 2019
3729769
Use daemon threads for concurrent globbing.
medb Feb 11, 2019
02b17d1
Add Hadoop File System extended attributes support.
medb Feb 14, 2019
4a428d5
Fix Hadoop 1 tests
medb Feb 14, 2019
e7dfbd9
Release GCS connector 1.9.14 and BQ connector 0.13.14.
medb Feb 14, 2019
4e097f2
Update versions for next connectors release development
medb Feb 14, 2019
c920e4e
Do not enforce directory path for inferred implicit directories
medb Feb 21, 2019
1550a05
Do not parallelize GCS list requests, because it leads to too high QP…
medb Feb 21, 2019
c2c14e3
Release GCS connector 1.9.15 and BQ connector 0.13.15.
medb Feb 21, 2019
137337c
Update versions for next connectors release development
medb Feb 23, 2019
9e4ace2
Break and stop the pagination if we have the results (#152)
Fokko Feb 23, 2019
f81d823
Use getMaxRemainingResults instead of manual computation
medb Feb 24, 2019
bde7be4
Eagerly fetch GoogleCloudStorageReadChannel metadata if 'fs.gs.inputs…
medb Feb 25, 2019
6abbb16
Release GCS connector 1.9.16 and BQ connector 0.13.16.
medb Feb 25, 2019
dd6b3cc
Update versions for next connectors release development
medb May 15, 2019
018749f
BigQuery connector: support nested record type in field schema.
Apr 27, 2019
152ff92
Initialize metadata in GoogleCloudStorageReadChannel.size() method, i…
medb May 13, 2019
1194136
Add property to parallelize GCS requests in `listStatus` and `getFile…
medb May 14, 2019
e6f742b
Add support for partitioned BigQuery tables (#163)
pbonito May 15, 2019
6fb7795
Release GCS connector 1.9.17 and BQ connector 0.13.17.
medb May 15, 2019
8461dc1
HWX: Modify pom to include HDP version
sidseth Nov 9, 2018
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 24 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,16 +1,28 @@
# bigdata-interop

[![GitHub release](https://img.shields.io/github/release/GoogleCloudPlatform/bigdata-interop.svg)](https://github.com/GoogleCloudPlatform/bigdata-interop/releases/latest)
[![GitHub release date](https://img.shields.io/github/release-date/GoogleCloudPlatform/bigdata-interop.svg)](https://github.com/GoogleCloudPlatform/bigdata-interop/releases/latest)
[![Code Quality: Java](https://img.shields.io/lgtm/grade/java/g/GoogleCloudPlatform/bigdata-interop.svg?logo=lgtm&logoWidth=18)](https://lgtm.com/projects/g/GoogleCloudPlatform/bigdata-interop/context:java)

Libraries and tools for interoperability between Apache Hadoop related
open-source software and Google Cloud Platform.

## Google Cloud Storage connector for Apache Hadoop

[![Maven Central](https://img.shields.io/maven-central/v/com.google.cloud.bigdataoss/gcs-connector/hadoop1.svg?label=Maven%20Central)](https://search.maven.org/search?q=g:com.google.cloud.bigdataoss%20AND%20a:gcs-connector%20AND%20v:hadoop1-*)
[![Maven Central](https://img.shields.io/maven-central/v/com.google.cloud.bigdataoss/gcs-connector/hadoop2.svg?label=Maven%20Central)](https://search.maven.org/search?q=g:com.google.cloud.bigdataoss%20AND%20a:gcs-connector%20AND%20v:hadoop2-*)
[![Maven Central](https://img.shields.io/maven-central/v/com.google.cloud.bigdataoss/gcs-connector/hadoop3.svg?label=Maven%20Central)](https://search.maven.org/search?q=g:com.google.cloud.bigdataoss%20AND%20a:gcs-connector%20AND%20v:hadoop3-*)

The Google Cloud Storage connector for Hadoop enables running MapReduce jobs
directly on data in Google Cloud Storage by implementing the Hadoop FileSystem
interface. For details, see [the README](gcs/README.md).

## Google BigQuery connector for Apache Hadoop MapReduce

[![Maven Central](https://img.shields.io/maven-central/v/com.google.cloud.bigdataoss/bigquery-connector/hadoop1.svg?label=Maven%20Central)](https://search.maven.org/search?q=g:com.google.cloud.bigdataoss%20AND%20a:bigquery-connector%20AND%20v:hadoop1-*)
[![Maven Central](https://img.shields.io/maven-central/v/com.google.cloud.bigdataoss/bigquery-connector/hadoop2.svg?label=Maven%20Central)](https://search.maven.org/search?q=g:com.google.cloud.bigdataoss%20AND%20a:bigquery-connector%20AND%20v:hadoop2-*)
[![Maven Central](https://img.shields.io/maven-central/v/com.google.cloud.bigdataoss/bigquery-connector/hadoop3.svg?label=Maven%20Central)](https://search.maven.org/search?q=g:com.google.cloud.bigdataoss%20AND%20a:bigquery-connector%20AND%20v:hadoop3-*)

The Google BigQuery connector for Hadoop MapReduce enables running MapReduce
jobs on data in BigQuery by implementing the InputFormat & OutputFormat
interfaces. For more details see
Expand All @@ -24,9 +36,8 @@ For more details see [the README](pubsub/README.md)

## Building the Cloud Storage (GCS) and BigQuery connectors

All the connectors can be built with Apache Maven 3 (as of 2018-08-07, version
3.5.4 has been tested). To build the connector for specific Hadoop version, run
the following commands from the main directory:
To build the connector for specific Hadoop version, run the following commands
from the main directory:

```bash
# with Hadoop 1 support:
Expand Down Expand Up @@ -58,9 +69,17 @@ To add a dependency on one of the connectors using Maven, use the following:
<groupId>com.google.cloud.bigdataoss</groupId>
<!-- Cloud Storage: -->
<artifactId>gcs-connector</artifactId>
<version>hadoop2-1.9.10</version>
<version>hadoop2-1.9.16</version>
<!-- or, for BigQuery: -->
<artifactId>bigquery-connector</artifactId>
<version>hadoop2-0.13.10</version>
<version>hadoop2-0.13.16</version>
</dependency>
```

## Resources

On **Stack Overflow**, use the tag
[`google-cloud-dataproc`](https://stackoverflow.com/tags/google-cloud-dataproc)
for questions about the connectors in this repository. This tag receives
responses from the Stack Overflow community and Google engineers, who monitor
the tag and offer unofficial support.
47 changes: 47 additions & 0 deletions bigquery/CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,50 @@
0.13.17 - 2019-05-15

1. POM updates for GCS connector 1.9.17.

2. Support nested record type in field schema in BigQuery connector.

3. Add a property to specify BigQuery tables partitioning definition:

mapred.bq.output.table.partitioning


0.13.16 - 2019-02-25

1. POM updates for GCS connector 1.9.16.


0.13.15 - 2019-02-21

1. POM updates for GCS connector 1.9.15.


0.13.14 - 2019-02-13

1. POM updates for GCS connector 1.9.14.


0.13.13 - 2019-02-04

1. POM updates for GCS connector 1.9.13.


0.13.12 - 2019-01-30

1. POM updates for GCS connector 1.9.12.

2. Improve exception message for BigQuery job execution errors.

3. Update all dependencies to latest versions.


0.13.11 - 2018-12-20

1. POM updates for GCS connector 1.9.11.

2. Update all dependencies to latest versions.


0.13.10 - 2018-11-01

1. POM updates for GCS connector 1.9.10.
Expand Down
9 changes: 4 additions & 5 deletions bigquery/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>com.google.cloud.bigdataoss</groupId>
<artifactId>bigdataoss-parent</artifactId>
<version>1.9.10</version>
<version>1.9.17</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand All @@ -32,7 +32,6 @@
</description>

<artifactId>bigquery-connector</artifactId>
<version>${hadoop.identifier}-0.13.10</version>

<profiles>
<profile>
Expand Down Expand Up @@ -183,7 +182,7 @@
<dependency>
<groupId>com.google.cloud.bigdataoss</groupId>
<artifactId>util-hadoop</artifactId>
<version>${hadoop.identifier}-${bigdataoss.version}</version>
<version>${bigdataoss.version}</version>
</dependency>
<dependency>
<groupId>com.google.cloud.bigdataoss</groupId>
Expand All @@ -195,12 +194,12 @@
<dependency>
<groupId>com.google.cloud.bigdataoss</groupId>
<artifactId>gcs-connector</artifactId>
<version>${hadoop.identifier}-${bigdataoss.version}</version>
<version>${bigdataoss.version}</version>
</dependency>
<dependency>
<groupId>com.google.cloud.bigdataoss</groupId>
<artifactId>gcs-connector</artifactId>
<version>${hadoop.identifier}-${bigdataoss.version}</version>
<version>${bigdataoss.version}</version>
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,12 @@ public class BigQueryConfiguration {
*/
public static final String OUTPUT_TABLE_SCHEMA_KEY = "mapred.bq.output.table.schema";

/**
* Configuration key for the output table partitioning used by the output format. This key is
* stored as a {@link String}.
*/
public static final String OUTPUT_TABLE_PARTITIONING_KEY = "mapred.bq.output.table.partitioning";

/**
* Configuration key for the Cloud KMS encryption key that will be used to protect output BigQuery
* table. This key is stored as a {@link String}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
*/
package com.google.cloud.hadoop.io.bigquery;

import static com.google.common.flogger.LazyArgs.lazy;

import com.google.api.services.bigquery.Bigquery;
import com.google.api.services.bigquery.Bigquery.Jobs.Insert;
import com.google.api.services.bigquery.model.Dataset;
Expand All @@ -26,6 +28,7 @@
import com.google.api.services.bigquery.model.Table;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.api.services.bigquery.model.TimePartitioning;
import com.google.cloud.hadoop.util.ApiErrorExtractor;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -141,19 +144,62 @@ public void importFromGcs(
List<String> gcsPaths,
boolean awaitCompletion)
throws IOException, InterruptedException {
importFromGcs(
projectId,
tableRef,
schema,
/* timePartitioning= */ null,
kmsKeyName,
sourceFormat,
writeDisposition,
gcsPaths,
awaitCompletion);
}

/**
* Imports data from GCS into BigQuery via a load job. Optionally polls for completion before
* returning.
*
* @param projectId the project on whose behalf to perform the load.
* @param tableRef the reference to the destination table.
* @param schema the schema of the source data to populate the destination table by.
* @param timePartitioning time partitioning to populate the destination table.
* @param kmsKeyName the Cloud KMS encryption key used to protect the output table.
* @param sourceFormat the file format of the source data.
* @param writeDisposition the write disposition of the output table.
* @param gcsPaths the location of the source data in GCS.
* @param awaitCompletion if true, block and poll until job completes, otherwise return as soon as
* the job has been successfully dispatched.
* @throws IOException
* @throws InterruptedException if interrupted while waiting for job completion.
*/
public void importFromGcs(
String projectId,
TableReference tableRef,
@Nullable TableSchema schema,
@Nullable TimePartitioning timePartitioning,
@Nullable String kmsKeyName,
BigQueryFileFormat sourceFormat,
String writeDisposition,
List<String> gcsPaths,
boolean awaitCompletion)
throws IOException, InterruptedException {
logger.atInfo().log(
"Importing into table '%s' from %s paths; path[0] is '%s'; awaitCompletion: %s",
BigQueryStrings.toString(tableRef),
"Importing into table '%s' from %s paths; path[0] is '%s'; awaitCompletion: %s;"
+ " timePartitioning: %s",
lazy(() -> BigQueryStrings.toString(tableRef)),
gcsPaths.size(),
gcsPaths.isEmpty() ? "(empty)" : gcsPaths.get(0),
awaitCompletion);
awaitCompletion,
timePartitioning);

// Create load conf with minimal requirements.
JobConfigurationLoad loadConfig = new JobConfigurationLoad();
loadConfig.setSchema(schema);
loadConfig.setSourceFormat(sourceFormat.getFormatIdentifier());
loadConfig.setSourceUris(gcsPaths);
loadConfig.setDestinationTable(tableRef);
loadConfig.setTimePartitioning(timePartitioning);
loadConfig.setWriteDisposition(writeDisposition);
if (!Strings.isNullOrEmpty(kmsKeyName)) {
loadConfig.setDestinationEncryptionConfiguration(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,8 @@ public void commitTask(TaskAttemptContext context) throws IOException {
// Run the job.
logger.atFine().log(
"commitTask: Running table copy from %s to %s",
BigQueryStrings.toString(tempTableRef), BigQueryStrings.toString(finalTableRef));
lazy(() -> BigQueryStrings.toString(tempTableRef)),
lazy(() -> BigQueryStrings.toString(finalTableRef)));
Job response = bigQueryHelper.insertJobOrFetchDuplicate(projectId, job);
logger.atFine().log("Got response '%s'", response);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
*/
package com.google.cloud.hadoop.io.bigquery;

import static com.google.common.base.Preconditions.checkArgument;

import com.google.api.client.util.BackOff;
import com.google.api.client.util.ExponentialBackOff;
import com.google.api.client.util.Sleeper;
Expand All @@ -23,7 +25,6 @@
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.cloud.hadoop.util.ResilientOperation;
import com.google.cloud.hadoop.util.RetryDeterminer;
import com.google.common.base.Preconditions;
import com.google.common.flogger.GoogleLogger;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
Expand All @@ -35,9 +36,7 @@
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.util.Progressable;

/**
* Helper methods to interact with BigQuery.
*/
/** Helper methods to interact with BigQuery. */
public class BigQueryUtils {
private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();

Expand All @@ -61,15 +60,11 @@ public class BigQueryUtils {
* @param projectId the project that is polling.
* @param jobReference the job to poll.
* @param progressable to get progress of task.
*
* @throws IOException on IO Error.
* @throws InterruptedException on sleep interrupt.
*/
public static void waitForJobCompletion(
Bigquery bigquery,
String projectId,
JobReference jobReference,
Progressable progressable)
Bigquery bigquery, String projectId, JobReference jobReference, Progressable progressable)
throws IOException, InterruptedException {

Sleeper sleeper = Sleeper.DEFAULT;
Expand All @@ -88,16 +83,19 @@ public static void waitForJobCompletion(
// While job is incomplete continue to poll.
while (notDone) {
BackOff operationBackOff = new ExponentialBackOff.Builder().build();
Get get = bigquery.jobs()
.get(projectId, jobReference.getJobId())
.setLocation(jobReference.getLocation());

Job pollJob = ResilientOperation.retry(
ResilientOperation.getGoogleRequestCallable(get),
operationBackOff,
RetryDeterminer.RATE_LIMIT_ERRORS,
IOException.class,
sleeper);
Get get =
bigquery
.jobs()
.get(projectId, jobReference.getJobId())
.setLocation(jobReference.getLocation());

Job pollJob =
ResilientOperation.retry(
ResilientOperation.getGoogleRequestCallable(get),
operationBackOff,
RetryDeterminer.RATE_LIMIT_ERRORS,
IOException.class,
sleeper);

elapsedTime = System.currentTimeMillis() - startTime;
logger.atFine().log(
Expand All @@ -106,16 +104,16 @@ public static void waitForJobCompletion(
if (pollJob.getStatus().getState().equals("DONE")) {
notDone = false;
if (pollJob.getStatus().getErrorResult() != null) {
throw new IOException(pollJob.getStatus().getErrorResult().getMessage());
throw new IOException(
"Error during BigQuery job execution: " + pollJob.getStatus().getErrorResult());
}
} else {
long millisToWait = pollBackOff.nextBackOffMillis();
if (millisToWait == BackOff.STOP) {
throw new IOException(
String.format(
"Job %s failed to complete after %s millis.",
jobReference.getJobId(),
elapsedTime));
jobReference.getJobId(), elapsedTime));
}
// Pause execution for the configured duration before polling job status again.
Thread.sleep(millisToWait);
Expand All @@ -137,18 +135,20 @@ public static List<TableFieldSchema> getSchemaFromString(String fields) {
// Parse the output schema for Json from fields.
JsonParser jsonParser = new JsonParser();
JsonArray json = jsonParser.parse(fields).getAsJsonArray();
List<TableFieldSchema> fieldsList = new ArrayList<>();
List<TableFieldSchema> fieldsList = new ArrayList<>();

// For each item in the list of fields.
for (JsonElement jsonElement : json) {
Preconditions.checkArgument(jsonElement.isJsonObject(),
"Expected JsonObject for element, got '%s'.", jsonElement);
checkArgument(
jsonElement.isJsonObject(), "Expected JsonObject for element, got '%s'.", jsonElement);
JsonObject jsonObject = jsonElement.getAsJsonObject();

// Set the name and type.
Preconditions.checkArgument(jsonObject.get("name") != null,
checkArgument(
jsonObject.get("name") != null,
"Expected non-null entry for key 'name' in JsonObject '%s'", jsonObject);
Preconditions.checkArgument(jsonObject.get("type") != null,
checkArgument(
jsonObject.get("type") != null,
"Expected non-null entry for key 'type' in JsonObject '%s'", jsonObject);
TableFieldSchema fieldDef = new TableFieldSchema();
fieldDef.setName(jsonObject.get("name").getAsString());
Expand All @@ -161,7 +161,7 @@ public static List<TableFieldSchema> getSchemaFromString(String fields) {

// If the type is RECORD set the fields.
if (jsonObject.get("type").getAsString().equals("RECORD")) {
Preconditions.checkArgument(
checkArgument(
jsonObject.get("fields") != null,
"Expected non-null entry for key 'fields' in JsonObject of type RECORD: '%s'",
jsonObject);
Expand Down
Loading