Skip to content

Commit

Permalink
Cherry pick/revert gax executor (#23854)
Browse files Browse the repository at this point in the history
* Merge pull request #23795: Revert 23234: issue #23794

* Revert "Update BQIO to a single scheduled executor service reduce threads (#23234)" (#23793)

This reverts commit 8e2431c.

(cherry picked from commit 01da3fc)

* Merge pull request #23795: Revert 23234: issue #23794

(cherry picked from commit d38f577)

Co-authored-by: reuvenlax <relax@google.com>
  • Loading branch information
johnjcasey and reuvenlax committed Oct 26, 2022
1 parent edb250b commit 31a866a
Showing 1 changed file with 0 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import com.google.api.client.util.ExponentialBackOff;
import com.google.api.client.util.Sleeper;
import com.google.api.core.ApiFuture;
import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.FixedHeaderProvider;
Expand Down Expand Up @@ -106,7 +105,6 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
Expand All @@ -122,7 +120,6 @@
import org.apache.beam.sdk.extensions.gcp.util.Transport;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.ExecutorOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.util.FluentBackoff;
Expand Down Expand Up @@ -1488,36 +1485,12 @@ private static BigQueryWriteClient newBigQueryWriteClient(BigQueryOptions option
return BigQueryWriteClient.create(
BigQueryWriteSettings.newBuilder()
.setCredentialsProvider(() -> options.as(GcpOptions.class).getGcpCredential())
.setBackgroundExecutorProvider(new OptionsExecutionProvider(options))
.build());
} catch (Exception e) {
throw new RuntimeException(e);
}
}

/**
* OptionsExecutionProvider is a utility class used to wrap the Pipeline-wide {@link
* ScheduledExecutorService} into a supplier for the {@link BigQueryWriteClient}.
*/
private static class OptionsExecutionProvider implements ExecutorProvider {

private final BigQueryOptions options;

public OptionsExecutionProvider(BigQueryOptions options) {
this.options = options;
}

@Override
public boolean shouldAutoClose() {
return false;
}

@Override
public ScheduledExecutorService getExecutor() {
return options.as(ExecutorOptions.class).getScheduledExecutorService();
}
}

public static CustomHttpErrors createBigQueryClientCustomErrors() {
CustomHttpErrors.Builder builder = new CustomHttpErrors.Builder();
// 403 errors, to list tables, matching this URL:
Expand Down

0 comments on commit 31a866a

Please sign in to comment.