Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/define partition for bigquery import #163

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions bigquery/CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@

1. Remove dependency on GCS connector code.

1. Add a property to specify BigQuery tables partitioning definition:

mapred.bq.output.table.partitioning

### 0.13.14 - 2019-02-13

1. POM updates for GCS connector 1.9.14.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,12 @@ public class BigQueryConfiguration {
*/
public static final String OUTPUT_TABLE_SCHEMA_KEY = "mapred.bq.output.table.schema";

/**
* Configuration key for the output table partitioning used by the output format. This key is
* stored as a {@link String}.
*/
public static final String OUTPUT_TABLE_PARTITIONING_KEY = "mapred.bq.output.table.partitioning";

/**
* Configuration key for the Cloud KMS encryption key that will be used to protect output BigQuery
* table. This key is stored as a {@link String}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
*/
package com.google.cloud.hadoop.io.bigquery;

import static com.google.common.flogger.LazyArgs.lazy;

import com.google.api.services.bigquery.Bigquery;
import com.google.api.services.bigquery.Bigquery.Jobs.Insert;
import com.google.api.services.bigquery.model.Dataset;
Expand All @@ -26,6 +28,7 @@
import com.google.api.services.bigquery.model.Table;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.api.services.bigquery.model.TimePartitioning;
import com.google.cloud.hadoop.util.ApiErrorExtractor;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -114,6 +117,7 @@ public void importFederatedFromGcs(
* @param projectId the project on whose behalf to perform the load.
* @param tableRef the reference to the destination table.
* @param schema the schema of the source data to populate the destination table by.
* @param timePartitioning time partitioning to populate the destination table.
* @param kmsKeyName the Cloud KMS encryption key used to protect the output table.
* @param sourceFormat the file format of the source data.
* @param writeDisposition the write disposition of the output table.
Expand All @@ -127,25 +131,29 @@ public void importFromGcs(
String projectId,
TableReference tableRef,
@Nullable TableSchema schema,
@Nullable TimePartitioning timePartitioning,
@Nullable String kmsKeyName,
BigQueryFileFormat sourceFormat,
String writeDisposition,
List<String> gcsPaths,
boolean awaitCompletion)
throws IOException, InterruptedException {
logger.atInfo().log(
"Importing into table '%s' from %s paths; path[0] is '%s'; awaitCompletion: %s",
BigQueryStrings.toString(tableRef),
"Importing into table '%s' from %s paths; path[0] is '%s'; awaitCompletion: %s;"
+ " timePartitioning: %s",
lazy(() -> BigQueryStrings.toString(tableRef)),
gcsPaths.size(),
gcsPaths.isEmpty() ? "(empty)" : gcsPaths.get(0),
awaitCompletion);
awaitCompletion,
timePartitioning);

// Create load conf with minimal requirements.
JobConfigurationLoad loadConfig = new JobConfigurationLoad();
loadConfig.setSchema(schema);
loadConfig.setSourceFormat(sourceFormat.getFormatIdentifier());
loadConfig.setSourceUris(gcsPaths);
loadConfig.setDestinationTable(tableRef);
loadConfig.setTimePartitioning(timePartitioning);
loadConfig.setWriteDisposition(writeDisposition);
if (!Strings.isNullOrEmpty(kmsKeyName)) {
loadConfig.setDestinationEncryptionConfiguration(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.api.services.bigquery.model.TimePartitioning;
import com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration;
import com.google.cloud.hadoop.io.bigquery.BigQueryFileFormat;
import com.google.cloud.hadoop.io.bigquery.BigQueryStrings;
Expand Down Expand Up @@ -314,6 +315,31 @@ static Optional<BigQueryTableSchema> getTableSchema(Configuration conf) throws I
return Optional.empty();
}

/**
* Gets the output table time partitioning based on the given configuration.
*
* @param conf the configuration to reference the keys from.
* @return the derived table time partitioning, absent value if no table time partitioning exists
* in the configuration.
* @throws IOException if a table time partitioning was set in the configuration but couldn't be
* parsed.
*/
static Optional<BigQueryTimePartitioning> getTablePartitioning(Configuration conf)
throws IOException {
String fieldsJson = conf.get(BigQueryConfiguration.OUTPUT_TABLE_PARTITIONING_KEY);
if (!Strings.isNullOrEmpty(fieldsJson)) {
try {
TimePartitioning tablePartitioning = BigQueryTimePartitioning.getFromJson(fieldsJson);
return Optional.of(BigQueryTimePartitioning.wrap(tablePartitioning));
} catch (IOException e) {
throw new IOException(
"Unable to parse key '" + BigQueryConfiguration.OUTPUT_TABLE_PARTITIONING_KEY + "'.",
e);
}
}
return Optional.empty();
}

/**
* Gets the output table KMS key name based on the given configuration.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Copyright 2019 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.hadoop.io.bigquery.output;

import com.google.api.client.json.JsonParser;
import com.google.api.client.json.jackson2.JacksonFactory;
import com.google.api.services.bigquery.model.TimePartitioning;
import java.io.IOException;

/**
* Wrapper for BigQuery {@link TimePartitioning}.
*
* <p>This class is used to avoid client code to depend on BigQuery API classes, so that there is no
* potential conflict between different versions of BigQuery API libraries in the client.
*
* @see TimePartitioning.
*/
public class BigQueryTimePartitioning {
private final TimePartitioning timePartitioning;

public BigQueryTimePartitioning() {
this.timePartitioning = new TimePartitioning();
}

public BigQueryTimePartitioning(TimePartitioning timePartitioning) {
this.timePartitioning = timePartitioning;
}

public String getType() {
return timePartitioning.getType();
}

public void setType(String type) {
timePartitioning.setType(type);
}

public String getField() {
return timePartitioning.getField();
}

public void setField(String field) {
timePartitioning.setField(field);
}

public long getExpirationMs() {
return timePartitioning.getExpirationMs();
}

public void setExpirationMs(long expirationMs) {
timePartitioning.setExpirationMs(expirationMs);
}

public Boolean getRequirePartitionFilter() {
return timePartitioning.getRequirePartitionFilter();
}

public void setRequirePartitionFilter(Boolean requirePartitionFilter) {
timePartitioning.setRequirePartitionFilter(requirePartitionFilter);
}

@Override
public int hashCode() {
return timePartitioning.hashCode();
}

@Override
public boolean equals(Object obj) {
if (!(obj instanceof BigQueryTimePartitioning)) {
return false;
}
BigQueryTimePartitioning other = (BigQueryTimePartitioning) obj;
return timePartitioning.equals(other.timePartitioning);
}

TimePartitioning get() {
return timePartitioning;
}

static TimePartitioning getFromJson(String json) throws IOException {
JsonParser parser = JacksonFactory.getDefaultInstance().createJsonParser(json);
return parser.parseAndClose(TimePartitioning.class);
}

public String getAsJson() throws IOException {
return JacksonFactory.getDefaultInstance().toString(timePartitioning);
}

static BigQueryTimePartitioning wrap(TimePartitioning tableTimePartitioning) {
return new BigQueryTimePartitioning(tableTimePartitioning);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ public void commitJob(JobContext context) throws IOException {
String destProjectId = BigQueryOutputConfiguration.getProjectId(conf);
String writeDisposition = BigQueryOutputConfiguration.getWriteDisposition(conf);
Optional<BigQueryTableSchema> destSchema = BigQueryOutputConfiguration.getTableSchema(conf);
Optional<BigQueryTimePartitioning> timePartitioning =
BigQueryOutputConfiguration.getTablePartitioning(conf);
String kmsKeyName = BigQueryOutputConfiguration.getKmsKeyName(conf);
BigQueryFileFormat outputFileFormat = BigQueryOutputConfiguration.getFileFormat(conf);
List<String> sourceUris = getOutputFileURIs();
Expand All @@ -72,6 +74,7 @@ public void commitJob(JobContext context) throws IOException {
destProjectId,
destTable,
destSchema.isPresent() ? destSchema.get().get() : null,
timePartitioning.isPresent() ? timePartitioning.get().get() : null,
kmsKeyName,
outputFileFormat,
writeDisposition,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ public Job answer(InvocationOnMock invocationOnMock) throws Throwable {
jobProjectId,
tableRef,
fakeTableSchema,
/* timePartitioning= */ null,
kmsKeyName,
BigQueryFileFormat.NEWLINE_DELIMITED_JSON,
BigQueryConfiguration.OUTPUT_TABLE_WRITE_DISPOSITION_DEFAULT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,9 @@ public void setup() throws Exception {
.setProjectId(PROJECT_ID)
.setDatasetId(DATASET_ID)
.setTableId("shakespeare"),
null,
null,
/* schema= */ null,
/* timePartitioning= */ null,
/* kmsKeyName= */ null,
BigQueryFileFormat.NEWLINE_DELIMITED_JSON,
"WRITE_EMPTY",
ImmutableList.of("gs://" + testBucket + "/shakespeare"),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright 2019 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.hadoop.io.bigquery.output;

import static com.google.common.truth.Truth.assertThat;

import java.io.IOException;
import org.junit.Test;

public class BigQueryTimePartitioningTest {

public static final String TIME_PARTITIONING_JSON =
"{\"expirationMs\":\"1000\",\"field\":\"ingestDate\",\"requirePartitionFilter\":true,"
+ "\"type\":\"DAY\"}";

@Test
public void testConvertToJson() throws IOException {
BigQueryTimePartitioning bigQueryTimePartitioning = new BigQueryTimePartitioning();
bigQueryTimePartitioning.setType("DAY");
bigQueryTimePartitioning.setExpirationMs(1000L);
bigQueryTimePartitioning.setField("ingestDate");
bigQueryTimePartitioning.setRequirePartitionFilter(true);

assertThat(bigQueryTimePartitioning.getAsJson()).isEqualTo(TIME_PARTITIONING_JSON);
}

@Test
public void testConvertFromJson() throws IOException {
BigQueryTimePartitioning bigQueryTimePartitioning = new BigQueryTimePartitioning();
bigQueryTimePartitioning.setType("DAY");
bigQueryTimePartitioning.setExpirationMs(1000L);
bigQueryTimePartitioning.setField("ingestDate");
bigQueryTimePartitioning.setRequirePartitionFilter(true);

assertThat(BigQueryTimePartitioning.getFromJson(TIME_PARTITIONING_JSON))
.isEqualTo(bigQueryTimePartitioning.get());
}

@Test
public void testConversion_OnlyTypeIsPresent() throws IOException {
BigQueryTimePartitioning bigQueryTimePartitioning = new BigQueryTimePartitioning();
bigQueryTimePartitioning.setType("DAY");
String json = bigQueryTimePartitioning.getAsJson();
pbonito marked this conversation as resolved.
Show resolved Hide resolved

assertThat(json).isEqualTo("{\"type\":\"DAY\"}");
assertThat(BigQueryTimePartitioning.getFromJson(json).getType()).isEqualTo("DAY");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.api.services.bigquery.model.TimePartitioning;
import com.google.cloud.hadoop.fs.gcs.InMemoryGoogleHadoopFileSystem;
import com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration;
import com.google.cloud.hadoop.io.bigquery.BigQueryFileFormat;
Expand Down Expand Up @@ -69,6 +70,10 @@ public class IndirectBigQueryOutputCommitterTest {
private static final String QUALIFIED_TEST_TABLE_ID =
String.format("%s:%s.%s", TEST_PROJECT_ID, TEST_DATASET_ID, TEST_TABLE_ID);

/** Sample table time partitioning used for output. */
private static final BigQueryTimePartitioning TEST_TIME_PARTITIONING =
BigQueryTimePartitioning.wrap(new TimePartitioning().setType("DAY"));

/** Sample output file format for the committer. */
private static final BigQueryFileFormat TEST_FILE_FORMAT =
BigQueryFileFormat.NEWLINE_DELIMITED_JSON;
Expand Down Expand Up @@ -151,6 +156,8 @@ public void setUp() throws IOException {
TEST_FILE_FORMAT,
TEST_OUTPUT_CLASS);
BigQueryOutputConfiguration.setKmsKeyName(conf, TEST_KMS_KEY_NAME);
conf.set(
BigQueryConfiguration.OUTPUT_TABLE_PARTITIONING_KEY, TEST_TIME_PARTITIONING.getAsJson());

// Setup sample data.
outputTableRef = BigQueryOutputConfiguration.getTableReference(conf);
Expand Down Expand Up @@ -204,6 +211,7 @@ public void testCommitJob() throws IOException, InterruptedException {
eq(TEST_PROJECT_ID),
eq(outputTableRef),
eq(TEST_TABLE_SCHEMA.get()),
eq(TEST_TIME_PARTITIONING.get()),
eq(TEST_KMS_KEY_NAME),
eq(TEST_FILE_FORMAT),
eq(TEST_WRITE_DISPOSITION),
Expand Down Expand Up @@ -234,6 +242,7 @@ public void testCommitJobInterrupt() throws IOException, InterruptedException {
any(String.class),
any(TableReference.class),
any(TableSchema.class),
any(TimePartitioning.class),
anyString(),
any(BigQueryFileFormat.class),
any(String.class),
Expand All @@ -249,6 +258,7 @@ public void testCommitJobInterrupt() throws IOException, InterruptedException {
eq(TEST_PROJECT_ID),
eq(outputTableRef),
eq(TEST_TABLE_SCHEMA.get()),
eq(TEST_TIME_PARTITIONING.get()),
eq(TEST_KMS_KEY_NAME),
eq(TEST_FILE_FORMAT),
eq(TEST_WRITE_DISPOSITION),
Expand Down