Skip to content

Commit

Permalink
Improve exception message
Browse files Browse the repository at this point in the history
Fixes: GoogleCloudDataproc#22

	Change on 2019/01/03 by idv <idv@google.com>

-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=227718811
  • Loading branch information
medb committed Jan 3, 2019
1 parent 60fc61b commit 9cca327
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 28 deletions.
2 changes: 2 additions & 0 deletions bigquery/CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

1. POM updates for GCS connector 1.9.12.

2. Improve exception message for BigQuery job execution errors.


0.13.11 - 2018-12-20

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
*/
package com.google.cloud.hadoop.io.bigquery;

import static com.google.common.base.Preconditions.checkArgument;

import com.google.api.client.util.BackOff;
import com.google.api.client.util.ExponentialBackOff;
import com.google.api.client.util.Sleeper;
Expand All @@ -23,7 +25,6 @@
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.cloud.hadoop.util.ResilientOperation;
import com.google.cloud.hadoop.util.RetryDeterminer;
import com.google.common.base.Preconditions;
import com.google.common.flogger.GoogleLogger;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
Expand All @@ -35,9 +36,7 @@
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.util.Progressable;

/**
* Helper methods to interact with BigQuery.
*/
/** Helper methods to interact with BigQuery. */
public class BigQueryUtils {
private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();

Expand All @@ -61,15 +60,11 @@ public class BigQueryUtils {
* @param projectId the project that is polling.
* @param jobReference the job to poll.
* @param progressable to get progress of task.
*
* @throws IOException on IO Error.
* @throws InterruptedException on sleep interrupt.
*/
public static void waitForJobCompletion(
Bigquery bigquery,
String projectId,
JobReference jobReference,
Progressable progressable)
Bigquery bigquery, String projectId, JobReference jobReference, Progressable progressable)
throws IOException, InterruptedException {

Sleeper sleeper = Sleeper.DEFAULT;
Expand All @@ -88,16 +83,19 @@ public static void waitForJobCompletion(
// While job is incomplete continue to poll.
while (notDone) {
BackOff operationBackOff = new ExponentialBackOff.Builder().build();
Get get = bigquery.jobs()
.get(projectId, jobReference.getJobId())
.setLocation(jobReference.getLocation());

Job pollJob = ResilientOperation.retry(
ResilientOperation.getGoogleRequestCallable(get),
operationBackOff,
RetryDeterminer.RATE_LIMIT_ERRORS,
IOException.class,
sleeper);
Get get =
bigquery
.jobs()
.get(projectId, jobReference.getJobId())
.setLocation(jobReference.getLocation());

Job pollJob =
ResilientOperation.retry(
ResilientOperation.getGoogleRequestCallable(get),
operationBackOff,
RetryDeterminer.RATE_LIMIT_ERRORS,
IOException.class,
sleeper);

elapsedTime = System.currentTimeMillis() - startTime;
logger.atFine().log(
Expand All @@ -106,16 +104,16 @@ public static void waitForJobCompletion(
if (pollJob.getStatus().getState().equals("DONE")) {
notDone = false;
if (pollJob.getStatus().getErrorResult() != null) {
throw new IOException(pollJob.getStatus().getErrorResult().getMessage());
throw new IOException(
"Error during BigQuery job execution: " + pollJob.getStatus().getErrorResult());
}
} else {
long millisToWait = pollBackOff.nextBackOffMillis();
if (millisToWait == BackOff.STOP) {
throw new IOException(
String.format(
"Job %s failed to complete after %s millis.",
jobReference.getJobId(),
elapsedTime));
jobReference.getJobId(), elapsedTime));
}
// Pause execution for the configured duration before polling job status again.
Thread.sleep(millisToWait);
Expand All @@ -137,18 +135,20 @@ public static List<TableFieldSchema> getSchemaFromString(String fields) {
// Parse the output schema for Json from fields.
JsonParser jsonParser = new JsonParser();
JsonArray json = jsonParser.parse(fields).getAsJsonArray();
List<TableFieldSchema> fieldsList = new ArrayList<>();
List<TableFieldSchema> fieldsList = new ArrayList<>();

// For each item in the list of fields.
for (JsonElement jsonElement : json) {
Preconditions.checkArgument(jsonElement.isJsonObject(),
"Expected JsonObject for element, got '%s'.", jsonElement);
checkArgument(
jsonElement.isJsonObject(), "Expected JsonObject for element, got '%s'.", jsonElement);
JsonObject jsonObject = jsonElement.getAsJsonObject();

// Set the name and type.
Preconditions.checkArgument(jsonObject.get("name") != null,
checkArgument(
jsonObject.get("name") != null,
"Expected non-null entry for key 'name' in JsonObject '%s'", jsonObject);
Preconditions.checkArgument(jsonObject.get("type") != null,
checkArgument(
jsonObject.get("type") != null,
"Expected non-null entry for key 'type' in JsonObject '%s'", jsonObject);
TableFieldSchema fieldDef = new TableFieldSchema();
fieldDef.setName(jsonObject.get("name").getAsString());
Expand All @@ -161,7 +161,7 @@ public static List<TableFieldSchema> getSchemaFromString(String fields) {

// If the type is RECORD set the fields.
if (jsonObject.get("type").getAsString().equals("RECORD")) {
Preconditions.checkArgument(
checkArgument(
jsonObject.get("fields") != null,
"Expected non-null entry for key 'fields' in JsonObject of type RECORD: '%s'",
jsonObject);
Expand Down

0 comments on commit 9cca327

Please sign in to comment.