diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
index d45b3c6ebcd47..b8bc272280970 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
@@ -48,9 +48,9 @@
import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
import org.apache.beam.sdk.extensions.gcp.storage.NoopPathValidator;
import org.apache.beam.sdk.extensions.gcp.util.BackOffAdapter;
-import org.apache.beam.sdk.extensions.gcp.util.FastNanoClockAndSleeper;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.ExpectedLogs;
+import org.apache.beam.sdk.util.FastNanoClockAndSleeper;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.joda.time.Duration;
@@ -149,7 +149,8 @@ public void testWaitToFinishMessagesFail() throws Exception {
new DataflowPipelineJob(DataflowClient.create(options), JOB_ID, options, ImmutableMap.of());
State state =
- job.waitUntilFinish(Duration.standardMinutes(5), jobHandler, fastClock, fastClock);
+ job.waitUntilFinish(
+ Duration.standardMinutes(5), jobHandler, fastClock::sleep, fastClock::nanoTime);
assertEquals(null, state);
}
@@ -169,7 +170,8 @@ public State mockWaitToFinishInState(State state) throws Exception {
DataflowPipelineJob job =
new DataflowPipelineJob(DataflowClient.create(options), JOB_ID, options, ImmutableMap.of());
- return job.waitUntilFinish(Duration.standardMinutes(1), null, fastClock, fastClock);
+ return job.waitUntilFinish(
+ Duration.standardMinutes(1), null, fastClock::sleep, fastClock::nanoTime);
}
/**
@@ -251,7 +253,9 @@ public void testWaitToFinishFail() throws Exception {
new DataflowPipelineJob(DataflowClient.create(options), JOB_ID, options, ImmutableMap.of());
long startTime = fastClock.nanoTime();
- State state = job.waitUntilFinish(Duration.standardMinutes(5), null, fastClock, fastClock);
+ State state =
+ job.waitUntilFinish(
+ Duration.standardMinutes(5), null, fastClock::sleep, fastClock::nanoTime);
assertEquals(null, state);
long timeDiff = TimeUnit.NANOSECONDS.toMillis(fastClock.nanoTime() - startTime);
checkValidInterval(
@@ -271,7 +275,8 @@ public void testWaitToFinishTimeFail() throws Exception {
DataflowPipelineJob job =
new DataflowPipelineJob(DataflowClient.create(options), JOB_ID, options, ImmutableMap.of());
long startTime = fastClock.nanoTime();
- State state = job.waitUntilFinish(Duration.millis(4), null, fastClock, fastClock);
+ State state =
+ job.waitUntilFinish(Duration.millis(4), null, fastClock::sleep, fastClock::nanoTime);
assertEquals(null, state);
long timeDiff = TimeUnit.NANOSECONDS.toMillis(fastClock.nanoTime() - startTime);
// Should only have slept for the 4 ms allowed.
@@ -318,7 +323,7 @@ public void testGetStateReturnsServiceState() throws Exception {
State.RUNNING,
job.getStateWithRetriesOrUnknownOnException(
BackOffAdapter.toGcpBackOff(DataflowPipelineJob.STATUS_BACKOFF_FACTORY.backoff()),
- fastClock));
+ fastClock::sleep));
}
@Test
@@ -335,7 +340,7 @@ public void testGetStateWithRetriesPassesExceptionThrough() throws Exception {
thrown.expect(IOException.class);
job.getStateWithRetries(
BackOffAdapter.toGcpBackOff(DataflowPipelineJob.STATUS_BACKOFF_FACTORY.backoff()),
- fastClock);
+ fastClock::sleep);
}
@Test
@@ -354,7 +359,7 @@ public void testGetStateNoThrowWithExceptionReturnsUnknown() throws Exception {
State.UNKNOWN,
job.getStateWithRetriesOrUnknownOnException(
BackOffAdapter.toGcpBackOff(DataflowPipelineJob.STATUS_BACKOFF_FACTORY.backoff()),
- fastClock));
+ fastClock::sleep));
long timeDiff = TimeUnit.NANOSECONDS.toMillis(fastClock.nanoTime() - startTime);
checkValidInterval(
DataflowPipelineJob.STATUS_POLLING_INTERVAL,
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
index ca5dd51121915..144d6cec7c515 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
@@ -68,7 +68,6 @@
import org.apache.beam.runners.dataflow.util.PackageUtil.PackageAttributes;
import org.apache.beam.runners.dataflow.util.PackageUtil.StagedFile;
import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
-import org.apache.beam.sdk.extensions.gcp.util.FastNanoClockAndSleeper;
import org.apache.beam.sdk.extensions.gcp.util.GcsUtil;
import org.apache.beam.sdk.extensions.gcp.util.GcsUtil.StorageObjectOrIOException;
import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath;
@@ -79,6 +78,7 @@
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.ExpectedLogs;
import org.apache.beam.sdk.testing.RegexMatcher;
+import org.apache.beam.sdk.util.FastNanoClockAndSleeper;
import org.apache.beam.sdk.util.MimeTypes;
import org.apache.beam.sdk.util.ZipFiles;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
@@ -413,7 +413,7 @@ public void testPackageUploadFailsWhenIOExceptionThrown() throws Exception {
directPackageUtil.stageClasspathElements(
ImmutableList.of(makeStagedFile(tmpFile.getAbsolutePath())),
STAGING_PATH,
- fastNanoClockAndSleeper,
+ fastNanoClockAndSleeper::sleep,
createOptions);
} finally {
verify(mockGcsUtil).getObjects(anyListOf(GcsPath.class));
@@ -441,7 +441,7 @@ public void testPackageUploadFailsWithPermissionsErrorGivesDetailedMessage() thr
directPackageUtil.stageClasspathElements(
ImmutableList.of(makeStagedFile(tmpFile.getAbsolutePath())),
STAGING_PATH,
- fastNanoClockAndSleeper,
+ fastNanoClockAndSleeper::sleep,
createOptions);
fail("Expected RuntimeException");
} catch (RuntimeException e) {
@@ -484,7 +484,7 @@ public void testPackageUploadEventuallySucceeds() throws Exception {
directPackageUtil.stageClasspathElements(
ImmutableList.of(makeStagedFile(tmpFile.getAbsolutePath())),
STAGING_PATH,
- fastNanoClockAndSleeper,
+ fastNanoClockAndSleeper::sleep,
createOptions);
} finally {
verify(mockGcsUtil).getObjects(anyListOf(GcsPath.class));
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorkerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorkerTest.java
index 6b55d313ddf43..d9ccda857332c 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorkerTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorkerTest.java
@@ -39,8 +39,8 @@
import java.util.ArrayList;
import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
import org.apache.beam.runners.dataflow.util.TimeUtil;
-import org.apache.beam.sdk.extensions.gcp.util.FastNanoClockAndSleeper;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.util.FastNanoClockAndSleeper;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Optional;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.hamcrest.Description;
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowBatchWorkerHarnessTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowBatchWorkerHarnessTest.java
index 34517435ad009..31052fb49bebe 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowBatchWorkerHarnessTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowBatchWorkerHarnessTest.java
@@ -30,10 +30,10 @@
import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
import org.apache.beam.runners.dataflow.worker.testing.RestoreDataflowLoggingMDC;
import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
-import org.apache.beam.sdk.extensions.gcp.util.FastNanoClockAndSleeper;
import org.apache.beam.sdk.extensions.gcp.util.Transport;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.RestoreSystemProperties;
+import org.apache.beam.sdk.util.FastNanoClockAndSleeper;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClientTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClientTest.java
index b96ee2fb84cbf..a7b53956e65af 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClientTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClientTest.java
@@ -38,10 +38,10 @@
import org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingMDC;
import org.apache.beam.runners.dataflow.worker.testing.RestoreDataflowLoggingMDC;
import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
-import org.apache.beam.sdk.extensions.gcp.util.FastNanoClockAndSleeper;
import org.apache.beam.sdk.extensions.gcp.util.Transport;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.RestoreSystemProperties;
+import org.apache.beam.sdk.util.FastNanoClockAndSleeper;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Optional;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NanoClock.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NanoClock.java
new file mode 100644
index 0000000000000..1f58f25b9315e
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NanoClock.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+/**
+ * Nano clock which can be used to measure elapsed time in nanoseconds.
+ *
+ *
The default system implementation can be accessed at {@link #SYSTEM}. Alternative
+ * implementations may be used for testing.
+ */
+@FunctionalInterface
+interface NanoClock {
+
+ /**
+ * Returns the current value of the most precise available system timer, in nanoseconds for use to
+ * measure elapsed time, to match the behavior of {@link System#nanoTime()}.
+ */
+ long nanoTime();
+
+ /**
+ * Provides the default System implementation of a nano clock by using {@link System#nanoTime()}.
+ */
+ NanoClock SYSTEM = System::nanoTime;
+}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/UnboundedScheduledExecutorService.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/UnboundedScheduledExecutorService.java
new file mode 100644
index 0000000000000..57a1a829f10c5
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/UnboundedScheduledExecutorService.java
@@ -0,0 +1,504 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.concurrent.AbstractExecutorService;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.RunnableScheduledFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.math.LongMath;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Longs;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.checkerframework.checker.nullness.qual.KeyForBottom;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * An unbounded {@link ScheduledExecutorService} based upon the {@link ScheduledThreadPoolExecutor}
+ * API contract.
+ *
+ *
Note that this implementation differs from a {@link ScheduledThreadPoolExecutor} in the
+ * following ways:
+ *
+ *
+ * - The core pool size is always 0.
+ *
- Any work that is immediately executable is given to a thread before returning from the
+ * corresponding {@code execute}, {@code submit}, {@code schedule*} methods.
+ *
- An unbounded number of threads can be started.
+ *
+ */
+public final class UnboundedScheduledExecutorService implements ScheduledExecutorService {
+
+ /**
+ * A {@link FutureTask} that handles periodically rescheduling tasks.
+ *
+ * Note that it is important that this class extends {@link FutureTask} and {@link
+ * RunnableScheduledFuture} to be compatible with the types of objects returned by a {@link
+ * ScheduledThreadPoolExecutor}.
+ */
+ @VisibleForTesting
+ @SuppressFBWarnings(
+ value = "EQ_COMPARETO_USE_OBJECT_EQUALS",
+ justification =
+ "Default equals/hashCode is what we want since two scheduled tasks are only equivalent if they point to the same instance.")
+ final class ScheduledFutureTask<@Nullable @KeyForBottom V> extends FutureTask
+ implements RunnableScheduledFuture {
+
+ /** Sequence number to break ties FIFO. */
+ private final long sequenceNumber;
+
+ /** The time the task is enabled to execute in nanoTime units. */
+ private long time;
+
+ /**
+ * Period in nanoseconds for repeating tasks. A positive value indicates fixed-rate execution. A
+ * negative value indicates fixed-delay execution. A value of 0 indicates a non-repeating
+ * (one-shot) task.
+ */
+ private final long period;
+
+ /** Creates a one-shot action with given nanoTime-based trigger time. */
+ ScheduledFutureTask(Runnable r, @Nullable V result, long triggerTime) {
+ this(r, result, triggerTime, 0);
+ }
+
+ /** Creates a periodic action with given nanoTime-based initial trigger time and period. */
+ @SuppressWarnings("argument.type.incompatible")
+ ScheduledFutureTask(Runnable r, @Nullable V result, long triggerTime, long period) {
+ super(r, result);
+ this.time = triggerTime;
+ this.period = period;
+ this.sequenceNumber = sequencer.getAndIncrement();
+ }
+
+ /** Creates a one-shot action with given nanoTime-based trigger time. */
+ ScheduledFutureTask(Callable callable, long triggerTime) {
+ super(callable);
+ this.time = triggerTime;
+ this.period = 0;
+ this.sequenceNumber = sequencer.getAndIncrement();
+ }
+
+ @Override
+ public long getDelay(TimeUnit unit) {
+ return unit.convert(LongMath.saturatedSubtract(time, clock.nanoTime()), NANOSECONDS);
+ }
+
+ @Override
+ public int compareTo(Delayed other) {
+ if (other == this) // compare zero if same object
+ {
+ return 0;
+ }
+ if (other instanceof ScheduledFutureTask) {
+ ScheduledFutureTask> x = (ScheduledFutureTask>) other;
+ int diff = Longs.compare(time, x.time);
+ if (diff != 0) {
+ return diff;
+ }
+ if (sequenceNumber < x.sequenceNumber) {
+ return -1;
+ }
+ return 1;
+ }
+ long diff = LongMath.saturatedSubtract(getDelay(NANOSECONDS), other.getDelay(NANOSECONDS));
+ return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
+ }
+
+ @Override
+ public boolean isPeriodic() {
+ return period != 0;
+ }
+
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ boolean cancelled = super.cancel(mayInterruptIfRunning);
+ synchronized (tasks) {
+ tasks.remove(this);
+ }
+ return cancelled;
+ }
+
+ /** Overrides {@link FutureTask} so as to reset/requeue if periodic. */
+ @Override
+ public void run() {
+ boolean periodic = isPeriodic();
+ if (!periodic) {
+ super.run();
+ } else if (super.runAndReset()) {
+ // Set the next runtime
+ if (period > 0) {
+ time = LongMath.saturatedAdd(time, period);
+ } else {
+ time = triggerTime(-period);
+ }
+ synchronized (tasks) {
+ tasks.add(this);
+ tasks.notify();
+ }
+ }
+ }
+ }
+
+ // Used to break ties in ordering of future tasks that are scheduled for the same time
+ // so that they have a consistent ordering based upon their insertion order into
+ // this ScheduledExecutorService.
+ private final AtomicLong sequencer = new AtomicLong();
+
+ private final NanoClock clock;
+ private final ThreadPoolExecutor threadPoolExecutor;
+ @VisibleForTesting final PriorityQueue> tasks;
+ private final AbstractExecutorService invokeMethodsAdapter;
+ private final Future> launchTasks;
+
+ public UnboundedScheduledExecutorService() {
+ this(NanoClock.SYSTEM);
+ }
+
+ @VisibleForTesting
+ UnboundedScheduledExecutorService(NanoClock clock) {
+ this.clock = clock;
+ ThreadFactoryBuilder threadFactoryBuilder = new ThreadFactoryBuilder();
+ threadFactoryBuilder.setThreadFactory(MoreExecutors.platformThreadFactory());
+ threadFactoryBuilder.setDaemon(true);
+
+ this.threadPoolExecutor =
+ new ThreadPoolExecutor(
+ 0,
+ Integer.MAX_VALUE, // Allow an unlimited number of re-usable threads.
+ Long.MAX_VALUE,
+ TimeUnit.NANOSECONDS, // Keep non-core threads alive forever.
+ new SynchronousQueue<>(),
+ threadFactoryBuilder.build());
+
+ // Create an internal adapter so that execute does not re-wrap the ScheduledFutureTask again
+ this.invokeMethodsAdapter =
+ new AbstractExecutorService() {
+
+ @Override
+ protected <@KeyForBottom T> RunnableFuture newTaskFor(Runnable runnable, T value) {
+ return new ScheduledFutureTask<>(runnable, value, 0);
+ }
+
+ @Override
+ protected <@KeyForBottom T> RunnableFuture newTaskFor(Callable callable) {
+ return new ScheduledFutureTask<>(callable, 0);
+ }
+
+ @Override
+ public void shutdown() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List shutdownNow() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isShutdown() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isTerminated() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ /* UnboundedScheduledExecutorService is the only caller after it has been initialized.*/
+ @SuppressWarnings("method.invocation.invalid")
+ public void execute(Runnable command) {
+ // These are already guaranteed to be a ScheduledFutureTask so there is no need to wrap
+ // it in another ScheduledFutureTask.
+ threadPoolExecutor.execute(command);
+ }
+ };
+ this.tasks = new PriorityQueue<>();
+ this.launchTasks =
+ threadPoolExecutor.submit(new TaskLauncher(tasks, threadPoolExecutor, clock));
+ }
+
+ private static class TaskLauncher implements Callable {
+ private final PriorityQueue> tasks;
+ private final ThreadPoolExecutor threadPoolExecutor;
+ private final NanoClock clock;
+
+ private TaskLauncher(
+ PriorityQueue> tasks,
+ ThreadPoolExecutor threadPoolExecutor,
+ NanoClock clock) {
+ this.tasks = tasks;
+ this.threadPoolExecutor = threadPoolExecutor;
+ this.clock = clock;
+ }
+
+ @Override
+ public Void call() throws Exception {
+ while (true) {
+ synchronized (tasks) {
+ if (threadPoolExecutor.isShutdown()) {
+ return null;
+ }
+ ScheduledFutureTask> task = tasks.peek();
+ if (task == null) {
+ tasks.wait();
+ continue;
+ }
+ long nanosToWait = LongMath.saturatedSubtract(task.time, clock.nanoTime());
+ if (nanosToWait > 0) {
+ long millisToWait = nanosToWait / 1_000_000;
+ int nanosRemainder = (int) (nanosToWait % 1_000_000);
+ tasks.wait(millisToWait, nanosRemainder);
+ continue;
+ }
+ // Remove the task from the queue since it is ready to be scheduled now
+ task = tasks.remove();
+ threadPoolExecutor.execute(task);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void shutdown() {
+ threadPoolExecutor.shutdown();
+ synchronized (tasks) {
+ // Notify tasks which checks to see if the ThreadPoolExecutor is shutdown and exits cleanly.
+ tasks.notify();
+ }
+
+ // Re-throw any errors during shutdown of the launchTasks thread.
+ try {
+ launchTasks.get();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ } catch (ExecutionException e) {
+ throw new RuntimeException(e.getCause());
+ }
+ }
+
+ @Override
+ public List shutdownNow() {
+ shutdown();
+ synchronized (tasks) {
+ List rval = new ArrayList<>(tasks);
+ tasks.clear();
+ rval.addAll(threadPoolExecutor.shutdownNow());
+ return rval;
+ }
+ }
+
+ @Override
+ public boolean isShutdown() {
+ return threadPoolExecutor.isShutdown();
+ }
+
+ @Override
+ public boolean isTerminated() {
+ return threadPoolExecutor.isTerminated();
+ }
+
+ @Override
+ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+ return threadPoolExecutor.awaitTermination(timeout, unit);
+ }
+
+ @Override
+ public void execute(Runnable command) {
+ if (command == null) {
+ throw new NullPointerException();
+ }
+ ScheduledFutureTask task = new ScheduledFutureTask<>(command, null, triggerTime(0));
+ threadPoolExecutor.execute(task);
+ }
+
+ @Override
+ public Future<@Nullable ?> submit(Runnable command) {
+ if (command == null) {
+ throw new NullPointerException();
+ }
+ ScheduledFutureTask task = new ScheduledFutureTask<>(command, null, triggerTime(0));
+ threadPoolExecutor.execute(task);
+ return task;
+ }
+
+ @Override
+ /* Ignore improper flag since FB detects that ScheduledExecutorService can't have nullable V. */
+ @SuppressWarnings("override.return.invalid")
+ public <@Nullable @KeyForBottom T> Future submit(Runnable command, T result) {
+ if (command == null) {
+ throw new NullPointerException();
+ }
+ ScheduledFutureTask task = new ScheduledFutureTask<>(command, result, triggerTime(0));
+ runNowOrScheduleInTheFuture(task);
+ return task;
+ }
+
+ @Override
+ /* Ignore improper flag since FB detects that ScheduledExecutorService can't have nullable V. */
+ @SuppressWarnings({"override.param.invalid", "override.return.invalid"})
+ public <@Nullable @KeyForBottom T> Future submit(Callable command) {
+ if (command == null) {
+ throw new NullPointerException();
+ }
+ ScheduledFutureTask task = new ScheduledFutureTask<>(command, triggerTime(0));
+ threadPoolExecutor.execute(task);
+ return task;
+ }
+
+ @Override
+ public <@KeyForBottom T> List> invokeAll(Collection extends Callable> tasks)
+ throws InterruptedException {
+ return invokeMethodsAdapter.invokeAll(tasks);
+ }
+
+ @Override
+ public <@KeyForBottom T> List> invokeAll(
+ Collection extends Callable> tasks, long timeout, TimeUnit unit)
+ throws InterruptedException {
+ if (tasks == null || unit == null) {
+ throw new NullPointerException();
+ }
+ return invokeMethodsAdapter.invokeAll(tasks, timeout, unit);
+ }
+
+ @Override
+ public <@KeyForBottom T> T invokeAny(Collection extends Callable> tasks)
+ throws InterruptedException, ExecutionException {
+ return invokeMethodsAdapter.invokeAny(tasks);
+ }
+
+ @Override
+ public <@KeyForBottom T> T invokeAny(
+ Collection extends Callable> tasks, long timeout, TimeUnit unit)
+ throws InterruptedException, ExecutionException, TimeoutException {
+ return invokeMethodsAdapter.invokeAny(tasks, timeout, unit);
+ }
+
+ @Override
+ public ScheduledFuture> schedule(Runnable command, long delay, TimeUnit unit) {
+ if (command == null || unit == null) {
+ throw new NullPointerException();
+ }
+ ScheduledFutureTask task =
+ new ScheduledFutureTask<>(command, null, triggerTime(delay, unit));
+ runNowOrScheduleInTheFuture(task);
+ return task;
+ }
+
+ @Override
+ /* Ignore improper flag since FB detects that ScheduledExecutorService can't have nullable V. */
+ @SuppressWarnings({"override.param.invalid", "override.return.invalid"})
+ public <@Nullable @KeyForBottom V> ScheduledFuture schedule(
+ Callable callable, long delay, TimeUnit unit) {
+ if (callable == null || unit == null) {
+ throw new NullPointerException();
+ }
+ ScheduledFutureTask task = new ScheduledFutureTask<>(callable, triggerTime(delay, unit));
+ runNowOrScheduleInTheFuture(task);
+ return task;
+ }
+
+ @Override
+ public ScheduledFuture> scheduleAtFixedRate(
+ Runnable command, long initialDelay, long period, TimeUnit unit) {
+ if (command == null || unit == null) {
+ throw new NullPointerException();
+ }
+ if (period <= 0) {
+ throw new IllegalArgumentException();
+ }
+ ScheduledFutureTask task =
+ new ScheduledFutureTask(
+ command, null, triggerTime(initialDelay, unit), unit.toNanos(period));
+ runNowOrScheduleInTheFuture(task);
+ return task;
+ }
+
+ @Override
+ public ScheduledFuture> scheduleWithFixedDelay(
+ Runnable command, long initialDelay, long delay, TimeUnit unit) {
+ if (command == null || unit == null) {
+ throw new NullPointerException();
+ }
+ if (delay <= 0) {
+ throw new IllegalArgumentException();
+ }
+ ScheduledFutureTask task =
+ new ScheduledFutureTask<>(
+ command, null, triggerTime(initialDelay, unit), -unit.toNanos(delay));
+ runNowOrScheduleInTheFuture(task);
+ return task;
+ }
+
+ private <@Nullable @KeyForBottom T> void runNowOrScheduleInTheFuture(
+ ScheduledFutureTask task) {
+ long nanosToWait = LongMath.saturatedSubtract(task.time, clock.nanoTime());
+ if (nanosToWait <= 0) {
+ threadPoolExecutor.execute(task);
+ return;
+ }
+
+ synchronized (tasks) {
+ if (isShutdown()) {
+ threadPoolExecutor
+ .getRejectedExecutionHandler()
+ .rejectedExecution(task, threadPoolExecutor);
+ }
+ tasks.add(task);
+ tasks.notify();
+ }
+ }
+
+ /** Returns the nanoTime-based trigger time of a delayed action. */
+ private long triggerTime(long delay, TimeUnit unit) {
+ return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
+ }
+
+ /** Returns the nanoTime-based trigger time of a delayed action. */
+ private long triggerTime(long delay) {
+ return LongMath.saturatedAdd(clock.nanoTime(), delay);
+ }
+}
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/FastNanoClockAndSleeper.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FastNanoClockAndSleeper.java
similarity index 82%
rename from sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/FastNanoClockAndSleeper.java
rename to sdks/java/core/src/test/java/org/apache/beam/sdk/util/FastNanoClockAndSleeper.java
index dd6f92cb9884f..331b2256f5a12 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/FastNanoClockAndSleeper.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FastNanoClockAndSleeper.java
@@ -15,10 +15,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.sdk.extensions.gcp.util;
+package org.apache.beam.sdk.util;
-import com.google.api.client.util.NanoClock;
-import com.google.api.client.util.Sleeper;
+import java.util.concurrent.atomic.AtomicLong;
import org.junit.rules.ExternalResource;
import org.junit.rules.TestRule;
@@ -28,20 +27,20 @@
*/
public class FastNanoClockAndSleeper extends ExternalResource
implements NanoClock, Sleeper, TestRule {
- private long fastNanoTime;
+ private AtomicLong fastNanoTime = new AtomicLong();
@Override
public long nanoTime() {
- return fastNanoTime;
+ return fastNanoTime.get();
}
@Override
protected void before() throws Throwable {
- fastNanoTime = SYSTEM.nanoTime();
+ fastNanoTime = new AtomicLong(NanoClock.SYSTEM.nanoTime());
}
@Override
public void sleep(long millis) throws InterruptedException {
- fastNanoTime += millis * 1000000L;
+ fastNanoTime.addAndGet(millis * 1000000L);
}
}
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/FastNanoClockAndSleeperTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FastNanoClockAndSleeperTest.java
similarity index 97%
rename from sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/FastNanoClockAndSleeperTest.java
rename to sdks/java/core/src/test/java/org/apache/beam/sdk/util/FastNanoClockAndSleeperTest.java
index 88a55e7234f1a..03f958850e9c0 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/FastNanoClockAndSleeperTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FastNanoClockAndSleeperTest.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.sdk.extensions.gcp.util;
+package org.apache.beam.sdk.util;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/UnboundedScheduledExecutorServiceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/UnboundedScheduledExecutorServiceTest.java
new file mode 100644
index 0000000000000..ededf69c914a8
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/UnboundedScheduledExecutorServiceTest.java
@@ -0,0 +1,505 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.collection.IsEmptyCollection.empty;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.verifyNoInteractions;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.beam.sdk.util.UnboundedScheduledExecutorService.ScheduledFutureTask;
+import org.hamcrest.collection.IsIterableContainingInOrder;
+import org.hamcrest.core.IsIterableContaining;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mockito;
+
+/** Tests for {@link UnboundedScheduledExecutorService}. */
+@RunWith(JUnit4.class)
+public class UnboundedScheduledExecutorServiceTest {
+
+ private static final Runnable RUNNABLE =
+ () -> {
+ // no-op
+ };
+ private static final Callable CALLABLE = () -> "A";
+
+ private static final Callable FAILING_CALLABLE =
+ () -> {
+ throw new Exception("Test");
+ };
+
+ @Test
+ public void testScheduleMethodErrorChecking() throws Exception {
+ FastNanoClockAndSleeper fastNanoClockAndSleeper = new FastNanoClockAndSleeper();
+ UnboundedScheduledExecutorService executorService =
+ new UnboundedScheduledExecutorService(fastNanoClockAndSleeper);
+ UnboundedScheduledExecutorService shutdownExecutorService =
+ new UnboundedScheduledExecutorService(fastNanoClockAndSleeper);
+ shutdownExecutorService.shutdown();
+
+ assertThrows(
+ NullPointerException.class, () -> executorService.schedule((Runnable) null, 10, SECONDS));
+ assertThrows(NullPointerException.class, () -> executorService.schedule(RUNNABLE, 10, null));
+ assertThrows(
+ RejectedExecutionException.class,
+ () -> shutdownExecutorService.schedule(RUNNABLE, 10, SECONDS));
+
+ assertThrows(
+ NullPointerException.class,
+ () -> executorService.schedule((Callable) null, 10, SECONDS));
+ assertThrows(NullPointerException.class, () -> executorService.schedule(CALLABLE, 10, null));
+ assertThrows(
+ RejectedExecutionException.class,
+ () -> shutdownExecutorService.schedule(CALLABLE, 10, SECONDS));
+
+ assertThrows(
+ NullPointerException.class,
+ () -> executorService.scheduleAtFixedRate(null, 10, 10, SECONDS));
+ assertThrows(
+ NullPointerException.class,
+ () -> executorService.scheduleAtFixedRate(RUNNABLE, 10, 10, null));
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> executorService.scheduleAtFixedRate(RUNNABLE, 10, -10, SECONDS));
+ assertThrows(
+ RejectedExecutionException.class,
+ () -> shutdownExecutorService.scheduleAtFixedRate(RUNNABLE, 10, 10, SECONDS));
+
+ assertThrows(
+ NullPointerException.class,
+ () -> executorService.scheduleWithFixedDelay((Runnable) null, 10, 10, SECONDS));
+ assertThrows(
+ NullPointerException.class,
+ () -> executorService.scheduleWithFixedDelay(RUNNABLE, 10, 10, null));
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> executorService.scheduleWithFixedDelay(RUNNABLE, 10, -10, SECONDS));
+ assertThrows(
+ RejectedExecutionException.class,
+ () -> shutdownExecutorService.scheduleWithFixedDelay(RUNNABLE, 10, 10, SECONDS));
+
+ assertThat(executorService.shutdownNow(), empty());
+ assertThat(executorService.shutdownNow(), empty());
+ }
+
+ @Test
+ public void testSubmitMethodErrorChecking() throws Exception {
+ FastNanoClockAndSleeper fastNanoClockAndSleeper = new FastNanoClockAndSleeper();
+ UnboundedScheduledExecutorService executorService =
+ new UnboundedScheduledExecutorService(fastNanoClockAndSleeper);
+ UnboundedScheduledExecutorService shutdownExecutorService =
+ new UnboundedScheduledExecutorService(fastNanoClockAndSleeper);
+ shutdownExecutorService.shutdown();
+
+ assertThrows(NullPointerException.class, () -> executorService.submit(null, "result"));
+ assertThrows(
+ RejectedExecutionException.class, () -> shutdownExecutorService.submit(RUNNABLE, "result"));
+
+ assertThrows(NullPointerException.class, () -> executorService.submit((Runnable) null));
+ assertThrows(RejectedExecutionException.class, () -> shutdownExecutorService.submit(RUNNABLE));
+
+ assertThrows(NullPointerException.class, () -> executorService.submit((Callable) null));
+ assertThrows(RejectedExecutionException.class, () -> shutdownExecutorService.submit(CALLABLE));
+
+ assertThat(executorService.shutdownNow(), empty());
+ assertThat(executorService.shutdownNow(), empty());
+ }
+
+ @Test
+ public void testInvokeMethodErrorChecking() throws Exception {
+ FastNanoClockAndSleeper fastNanoClockAndSleeper = new FastNanoClockAndSleeper();
+ UnboundedScheduledExecutorService executorService =
+ new UnboundedScheduledExecutorService(fastNanoClockAndSleeper);
+ UnboundedScheduledExecutorService shutdownExecutorService =
+ new UnboundedScheduledExecutorService(fastNanoClockAndSleeper);
+ shutdownExecutorService.shutdown();
+
+ assertThrows(NullPointerException.class, () -> executorService.invokeAll(null));
+ assertThrows(
+ NullPointerException.class, () -> executorService.invokeAll(Collections.singleton(null)));
+ assertThrows(
+ RejectedExecutionException.class,
+ () -> shutdownExecutorService.invokeAll(Collections.singleton(CALLABLE)));
+
+ assertThrows(NullPointerException.class, () -> executorService.invokeAll(null, 10, SECONDS));
+ assertThrows(
+ NullPointerException.class,
+ () -> executorService.invokeAll(Collections.singleton(null), 10, SECONDS));
+ assertThrows(
+ NullPointerException.class,
+ () -> executorService.invokeAll(Collections.singleton(CALLABLE), 10, null));
+ assertThrows(
+ RejectedExecutionException.class,
+ () -> shutdownExecutorService.invokeAll(Collections.singleton(CALLABLE), 10, SECONDS));
+
+ assertThrows(NullPointerException.class, () -> executorService.invokeAny(null));
+ assertThrows(
+ NullPointerException.class, () -> executorService.invokeAny(Collections.singleton(null)));
+ assertThrows(
+ IllegalArgumentException.class, () -> executorService.invokeAny(Collections.emptyList()));
+ assertThrows(
+ ExecutionException.class,
+ () -> executorService.invokeAny(Arrays.asList(FAILING_CALLABLE, FAILING_CALLABLE)));
+ assertThrows(
+ RejectedExecutionException.class,
+ () -> shutdownExecutorService.invokeAny(Collections.singleton(CALLABLE)));
+
+ assertThrows(NullPointerException.class, () -> executorService.invokeAny(null, 10, SECONDS));
+ assertThrows(
+ NullPointerException.class,
+ () -> executorService.invokeAny(Collections.singleton(null), 10, SECONDS));
+ assertThrows(
+ NullPointerException.class,
+ () -> executorService.invokeAny(Collections.singleton(CALLABLE), 10, null));
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> executorService.invokeAny(Collections.emptyList(), 10, SECONDS));
+ assertThrows(
+ ExecutionException.class,
+ () ->
+ executorService.invokeAny(
+ Arrays.asList(FAILING_CALLABLE, FAILING_CALLABLE), 10, SECONDS));
+ assertThrows(
+ RejectedExecutionException.class,
+ () -> shutdownExecutorService.invokeAny(Collections.singleton(CALLABLE), 10, SECONDS));
+
+ assertThat(executorService.shutdownNow(), empty());
+ assertThat(executorService.shutdownNow(), empty());
+ }
+
+ @Test
+ public void testExecuteMethodErrorChecking() throws Exception {
+ FastNanoClockAndSleeper fastNanoClockAndSleeper = new FastNanoClockAndSleeper();
+ UnboundedScheduledExecutorService executorService =
+ new UnboundedScheduledExecutorService(fastNanoClockAndSleeper);
+ UnboundedScheduledExecutorService shutdownExecutorService =
+ new UnboundedScheduledExecutorService(fastNanoClockAndSleeper);
+ shutdownExecutorService.shutdown();
+
+ assertThrows(NullPointerException.class, () -> executorService.execute(null));
+ assertThrows(RejectedExecutionException.class, () -> shutdownExecutorService.execute(RUNNABLE));
+
+ assertThat(executorService.shutdownNow(), empty());
+ assertThat(executorService.shutdownNow(), empty());
+ }
+
+ @Test
+ public void testAllMethodsReturnScheduledFutures() throws Exception {
+ FastNanoClockAndSleeper fastNanoClockAndSleeper = new FastNanoClockAndSleeper();
+ UnboundedScheduledExecutorService executorService =
+ new UnboundedScheduledExecutorService(fastNanoClockAndSleeper);
+
+ assertThat(executorService.submit(RUNNABLE), instanceOf(ScheduledFutureTask.class));
+ assertThat(executorService.submit(CALLABLE), instanceOf(ScheduledFutureTask.class));
+ assertThat(executorService.submit(RUNNABLE, "Answer"), instanceOf(ScheduledFutureTask.class));
+
+ assertThat(
+ executorService.schedule(RUNNABLE, 10, SECONDS), instanceOf(ScheduledFutureTask.class));
+ assertThat(
+ executorService.schedule(CALLABLE, 10, SECONDS), instanceOf(ScheduledFutureTask.class));
+ assertThat(
+ executorService.scheduleAtFixedRate(RUNNABLE, 10, 10, SECONDS),
+ instanceOf(ScheduledFutureTask.class));
+ assertThat(
+ executorService.scheduleWithFixedDelay(RUNNABLE, 10, 10, SECONDS),
+ instanceOf(ScheduledFutureTask.class));
+
+ assertThat(
+ executorService.invokeAll(Arrays.asList(CALLABLE, CALLABLE)),
+ IsIterableContainingInOrder.contains(
+ instanceOf(ScheduledFutureTask.class), instanceOf(ScheduledFutureTask.class)));
+ assertThat(
+ executorService.invokeAll(Arrays.asList(CALLABLE, CALLABLE), 10, SECONDS),
+ IsIterableContainingInOrder.contains(
+ instanceOf(ScheduledFutureTask.class), instanceOf(ScheduledFutureTask.class)));
+
+ executorService.shutdownNow();
+ }
+
+ @Test
+ public void testShutdown() throws Exception {
+ FastNanoClockAndSleeper fastNanoClockAndSleeper = new FastNanoClockAndSleeper();
+ UnboundedScheduledExecutorService executorService =
+ new UnboundedScheduledExecutorService(fastNanoClockAndSleeper);
+
+ Runnable runnable1 = Mockito.mock(Runnable.class);
+ Runnable runnable2 = Mockito.mock(Runnable.class);
+ Runnable runnable3 = Mockito.mock(Runnable.class);
+ Callable> callable1 = Mockito.mock(Callable.class);
+
+ Future> rFuture1 = executorService.schedule(runnable1, 10, SECONDS);
+ Future> cFuture1 = executorService.schedule(callable1, 10, SECONDS);
+ Future> rFuture2 = executorService.scheduleAtFixedRate(runnable2, 10, 10, SECONDS);
+ Future> rFuture3 = executorService.scheduleWithFixedDelay(runnable3, 10, 10, SECONDS);
+
+ assertThat(
+ executorService.shutdownNow(),
+ IsIterableContaining.hasItems(
+ (Runnable) rFuture1, (Runnable) rFuture2, (Runnable) rFuture3, (Runnable) cFuture1));
+ verifyNoInteractions(runnable1, runnable2, runnable3, callable1);
+
+ assertTrue(executorService.isShutdown());
+ assertTrue(executorService.awaitTermination(10, SECONDS));
+ assertTrue(executorService.isTerminated());
+ }
+
+ @Test
+ public void testExecute() throws Exception {
+ FastNanoClockAndSleeper fastNanoClockAndSleeper = new FastNanoClockAndSleeper();
+ UnboundedScheduledExecutorService executorService =
+ new UnboundedScheduledExecutorService(fastNanoClockAndSleeper);
+
+ AtomicInteger callCount = new AtomicInteger();
+ CountDownLatch countDownLatch = new CountDownLatch(1);
+
+ executorService.execute(
+ () -> {
+ callCount.incrementAndGet();
+ countDownLatch.countDown();
+ });
+
+ countDownLatch.await();
+ assertEquals(1, callCount.get());
+ }
+
+ @Test
+ public void testSubmit() throws Exception {
+ List callCounts = new ArrayList<>();
+ List> futures = new ArrayList<>();
+
+ FastNanoClockAndSleeper fastNanoClockAndSleeper = new FastNanoClockAndSleeper();
+ UnboundedScheduledExecutorService executorService =
+ new UnboundedScheduledExecutorService(fastNanoClockAndSleeper);
+
+ callCounts.add(new AtomicInteger());
+ futures.add(
+ (ScheduledFutureTask>)
+ executorService.submit(
+ (Runnable) callCounts.get(callCounts.size() - 1)::incrementAndGet));
+ callCounts.add(new AtomicInteger());
+ futures.add(
+ (ScheduledFutureTask>)
+ executorService.submit(
+ callCounts.get(callCounts.size() - 1)::incrementAndGet, "Result"));
+ callCounts.add(new AtomicInteger());
+ futures.add(
+ (ScheduledFutureTask>)
+ executorService.submit(callCounts.get(callCounts.size() - 1)::incrementAndGet));
+
+ assertNull(futures.get(0).get());
+ assertEquals("Result", futures.get(1).get());
+ assertEquals(1, futures.get(2).get());
+
+ for (int i = 0; i < callCounts.size(); ++i) {
+ assertFalse(futures.get(i).isPeriodic());
+ assertEquals(1, callCounts.get(i).get());
+ }
+ }
+
+ @Test
+ public void testSchedule() throws Exception {
+ List callCounts = new ArrayList<>();
+ List> futures = new ArrayList<>();
+
+ FastNanoClockAndSleeper fastNanoClockAndSleeper = new FastNanoClockAndSleeper();
+ UnboundedScheduledExecutorService executorService =
+ new UnboundedScheduledExecutorService(fastNanoClockAndSleeper);
+
+ callCounts.add(new AtomicInteger());
+ futures.add(
+ (ScheduledFutureTask>)
+ executorService.schedule(
+ (Runnable) callCounts.get(callCounts.size() - 1)::incrementAndGet,
+ 100,
+ MILLISECONDS));
+ callCounts.add(new AtomicInteger());
+ futures.add(
+ (ScheduledFutureTask>)
+ executorService.schedule(
+ callCounts.get(callCounts.size() - 1)::incrementAndGet, 100, MILLISECONDS));
+
+ // No tasks should have been picked up
+ wakeUpAndCheckTasks(executorService);
+ for (int i = 0; i < callCounts.size(); ++i) {
+ assertEquals(0, callCounts.get(i).get());
+ }
+
+ // No tasks should have been picked up even if the time advances 99 seconds
+ fastNanoClockAndSleeper.sleep(99);
+ wakeUpAndCheckTasks(executorService);
+ for (int i = 0; i < callCounts.size(); ++i) {
+ assertEquals(0, callCounts.get(i).get());
+ }
+
+ // All tasks should wake up and pick-up tasks
+ fastNanoClockAndSleeper.sleep(1);
+ wakeUpAndCheckTasks(executorService);
+
+ assertNull(futures.get(0).get());
+ assertEquals(1, futures.get(1).get());
+
+ for (int i = 0; i < callCounts.size(); ++i) {
+ assertFalse(futures.get(i).isPeriodic());
+ assertEquals(1, callCounts.get(i).get());
+ }
+
+ assertThat(executorService.shutdownNow(), empty());
+ }
+
+ @Test
+ public void testSchedulePeriodicWithFixedDelay() throws Exception {
+ FastNanoClockAndSleeper fastNanoClockAndSleeper = new FastNanoClockAndSleeper();
+ UnboundedScheduledExecutorService executorService =
+ new UnboundedScheduledExecutorService(fastNanoClockAndSleeper);
+
+ AtomicInteger callCount = new AtomicInteger();
+ CountDownLatch latch = new CountDownLatch(1);
+
+ ScheduledFutureTask> future =
+ (ScheduledFutureTask>)
+ executorService.scheduleWithFixedDelay(
+ () -> {
+ callCount.incrementAndGet();
+ latch.countDown();
+ },
+ 100,
+ 50,
+ MILLISECONDS);
+
+ // No tasks should have been picked up
+ wakeUpAndCheckTasks(executorService);
+ assertEquals(0, callCount.get());
+
+ // No tasks should have been picked up even if the time advances 99 seconds
+ fastNanoClockAndSleeper.sleep(99);
+ wakeUpAndCheckTasks(executorService);
+ assertEquals(0, callCount.get());
+
+ // We should have picked up the task 1 time, next task should be scheduled in 50 even though we
+ // advanced to 109
+ fastNanoClockAndSleeper.sleep(10);
+ wakeUpAndCheckTasks(executorService);
+ latch.await();
+ assertEquals(1, callCount.get());
+
+ for (; ; ) {
+ synchronized (executorService.tasks) {
+ ScheduledFutureTask> task = executorService.tasks.peek();
+ if (task != null) {
+ assertEquals(50, task.getDelay(MILLISECONDS));
+ break;
+ }
+ }
+ Thread.sleep(1);
+ }
+
+ assertTrue(future.isPeriodic());
+ assertFalse(future.isDone());
+
+ future.cancel(true);
+ assertTrue(future.isCancelled());
+ assertTrue(future.isDone());
+
+ // Cancelled tasks should not be returned during shutdown
+ assertThat(executorService.shutdownNow(), empty());
+ }
+
+ @Test
+ public void testSchedulePeriodicWithFixedRate() throws Exception {
+ FastNanoClockAndSleeper fastNanoClockAndSleeper = new FastNanoClockAndSleeper();
+ UnboundedScheduledExecutorService executorService =
+ new UnboundedScheduledExecutorService(fastNanoClockAndSleeper);
+
+ AtomicInteger callCount = new AtomicInteger();
+ CountDownLatch latch = new CountDownLatch(1);
+
+ ScheduledFutureTask> future =
+ (ScheduledFutureTask>)
+ executorService.scheduleAtFixedRate(
+ () -> {
+ callCount.incrementAndGet();
+ latch.countDown();
+ },
+ 100,
+ 50,
+ MILLISECONDS);
+
+ // No tasks should have been picked up
+ wakeUpAndCheckTasks(executorService);
+ assertEquals(0, callCount.get());
+
+ // No tasks should have been picked up even if the time advances 99 seconds
+ fastNanoClockAndSleeper.sleep(99);
+ wakeUpAndCheckTasks(executorService);
+ assertEquals(0, callCount.get());
+
+ // We should have picked up the task 1 time, next task should be scheduled in 41 since we
+ // advanced to 109
+ fastNanoClockAndSleeper.sleep(10);
+ wakeUpAndCheckTasks(executorService);
+ latch.await();
+ assertEquals(1, callCount.get());
+
+ for (; ; ) {
+ synchronized (executorService.tasks) {
+ ScheduledFutureTask> task = executorService.tasks.peek();
+ if (task != null) {
+ assertEquals(41, task.getDelay(MILLISECONDS));
+ break;
+ }
+ }
+ Thread.sleep(1);
+ }
+
+ assertTrue(future.isPeriodic());
+ assertFalse(future.isDone());
+
+ future.cancel(true);
+ assertTrue(future.isCancelled());
+ assertTrue(future.isDone());
+
+ // Cancelled tasks should not be returned during shutdown
+ assertThat(executorService.shutdownNow(), empty());
+ }
+
+ void wakeUpAndCheckTasks(UnboundedScheduledExecutorService executorService) throws Exception {
+ synchronized (executorService.tasks) {
+ executorService.tasks.notify();
+ }
+ Thread.sleep(100);
+ }
+}
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java
index d183d1647e112..0b14b244da5e2 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java
@@ -20,9 +20,6 @@
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.google.cloud.hadoop.util.AsyncWriteChannelOptions;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.extensions.gcp.storage.GcsPathValidator;
@@ -35,8 +32,7 @@
import org.apache.beam.sdk.options.Hidden;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.InstanceBuilder;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.beam.sdk.util.UnboundedScheduledExecutorService;
import org.checkerframework.checker.nullness.qual.Nullable;
/** Options used to configure Google Cloud Storage. */
@@ -134,12 +130,8 @@ public interface GcsOptions extends ApplicationNameOptions, GcpOptions, Pipeline
* ExecutorService} is compatible with AppEngine.
*/
class ExecutorServiceFactory implements DefaultValueFactory {
- @SuppressWarnings("deprecation") // IS_APP_ENGINE is deprecated for internal use only.
@Override
public ExecutorService create(PipelineOptions options) {
- ThreadFactoryBuilder threadFactoryBuilder = new ThreadFactoryBuilder();
- threadFactoryBuilder.setThreadFactory(MoreExecutors.platformThreadFactory());
- threadFactoryBuilder.setDaemon(true);
/* The SDK requires an unbounded thread pool because a step may create X writers
* each requiring their own thread to perform the writes otherwise a writer may
* block causing deadlock for the step because the writers buffer is full.
@@ -147,13 +139,7 @@ public ExecutorService create(PipelineOptions options) {
* them in forward order thus requiring enough threads so that each step's writers
* can be active.
*/
- return new ThreadPoolExecutor(
- 0,
- Integer.MAX_VALUE, // Allow an unlimited number of re-usable threads.
- Long.MAX_VALUE,
- TimeUnit.NANOSECONDS, // Keep non-core threads alive forever.
- new SynchronousQueue<>(),
- threadFactoryBuilder.build());
+ return new UnboundedScheduledExecutorService();
}
}
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java
index 5014bd7370c10..4ce530de4c2bf 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java
+++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java
@@ -58,8 +58,7 @@ public void testGcpCoreApiSurface() throws Exception {
classesInPackage("java"),
classesInPackage("javax"),
classesInPackage("org.apache.beam.sdk"),
- classesInPackage("org.joda.time"),
- classesInPackage("org.junit"));
+ classesInPackage("org.joda.time"));
assertThat(apiSurface, containsOnlyClassesMatching(allowedClasses));
}
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java
index 33a87c6d0ee37..26b270d5faf74 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java
+++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java
@@ -102,6 +102,7 @@
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.util.FastNanoClockAndSleeper;
import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
@@ -457,7 +458,7 @@ public void testRetryFileSizeNonBatch() throws IOException {
.getObject(
GcsPath.fromComponents("testbucket", "testobject"),
mockBackOff,
- new FastNanoClockAndSleeper())
+ new FastNanoClockAndSleeper()::sleep)
.getSize()
.longValue());
assertEquals(BackOff.STOP, mockBackOff.nextBackOffMillis());
@@ -713,7 +714,7 @@ public void testCreateBucket() throws IOException {
.thenThrow(new SocketTimeoutException("SocketException"))
.thenReturn(new Bucket());
- gcsUtil.createBucket("a", new Bucket(), mockBackOff, new FastNanoClockAndSleeper());
+ gcsUtil.createBucket("a", new Bucket(), mockBackOff, new FastNanoClockAndSleeper()::sleep);
}
@Test
@@ -741,7 +742,7 @@ public void testCreateBucketAccessErrors() throws IOException {
thrown.expect(AccessDeniedException.class);
- gcsUtil.createBucket("a", new Bucket(), mockBackOff, new FastNanoClockAndSleeper());
+ gcsUtil.createBucket("a", new Bucket(), mockBackOff, new FastNanoClockAndSleeper()::sleep);
}
@Test
@@ -767,7 +768,7 @@ public void testBucketAccessible() throws IOException {
gcsUtil.bucketAccessible(
GcsPath.fromComponents("testbucket", "testobject"),
mockBackOff,
- new FastNanoClockAndSleeper()));
+ new FastNanoClockAndSleeper()::sleep));
}
@Test
@@ -796,7 +797,7 @@ public void testBucketDoesNotExistBecauseOfAccessError() throws IOException {
gcsUtil.bucketAccessible(
GcsPath.fromComponents("testbucket", "testobject"),
mockBackOff,
- new FastNanoClockAndSleeper()));
+ new FastNanoClockAndSleeper()::sleep));
}
@Test
@@ -823,7 +824,7 @@ public void testBucketDoesNotExist() throws IOException {
gcsUtil.bucketAccessible(
GcsPath.fromComponents("testbucket", "testobject"),
mockBackOff,
- new FastNanoClockAndSleeper()));
+ new FastNanoClockAndSleeper()::sleep));
}
@Test
@@ -848,7 +849,7 @@ public void testVerifyBucketAccessible() throws IOException {
gcsUtil.verifyBucketAccessible(
GcsPath.fromComponents("testbucket", "testobject"),
mockBackOff,
- new FastNanoClockAndSleeper());
+ new FastNanoClockAndSleeper()::sleep);
}
@Test(expected = AccessDeniedException.class)
@@ -876,7 +877,7 @@ public void testVerifyBucketAccessibleAccessError() throws IOException {
gcsUtil.verifyBucketAccessible(
GcsPath.fromComponents("testbucket", "testobject"),
mockBackOff,
- new FastNanoClockAndSleeper());
+ new FastNanoClockAndSleeper()::sleep);
}
@Test(expected = FileNotFoundException.class)
@@ -902,7 +903,7 @@ public void testVerifyBucketAccessibleDoesNotExist() throws IOException {
gcsUtil.verifyBucketAccessible(
GcsPath.fromComponents("testbucket", "testobject"),
mockBackOff,
- new FastNanoClockAndSleeper());
+ new FastNanoClockAndSleeper()::sleep);
}
@Test
@@ -928,7 +929,7 @@ public void testGetBucket() throws IOException {
gcsUtil.getBucket(
GcsPath.fromComponents("testbucket", "testobject"),
mockBackOff,
- new FastNanoClockAndSleeper()));
+ new FastNanoClockAndSleeper()::sleep));
}
@Test
@@ -956,7 +957,7 @@ public void testGetBucketNotExists() throws IOException {
gcsUtil.getBucket(
GcsPath.fromComponents("testbucket", "testobject"),
mockBackOff,
- new FastNanoClockAndSleeper());
+ new FastNanoClockAndSleeper()::sleep);
}
@Test
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/RetryHttpRequestInitializerTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/RetryHttpRequestInitializerTest.java
index 8bcca0491ea0d..a604b97cd35aa 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/RetryHttpRequestInitializerTest.java
+++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/RetryHttpRequestInitializerTest.java
@@ -55,6 +55,7 @@
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.beam.sdk.testing.ExpectedLogs;
+import org.apache.beam.sdk.util.FastNanoClockAndSleeper;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Before;
@@ -314,7 +315,10 @@ public LowLevelHttpResponse execute() throws IOException {
transport,
Transport.getJsonFactory(),
new RetryHttpRequestInitializer(
- fakeClockAndSleeper, fakeClockAndSleeper, Collections.emptyList(), null))
+ fakeClockAndSleeper::nanoTime,
+ fakeClockAndSleeper::sleep,
+ Collections.emptyList(),
+ null))
.build();
Get getRequest = storage.objects().get("gs://fake", "file");
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
index a57299b0a0d79..176bac79da7b2 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
@@ -91,7 +91,6 @@
import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
import org.apache.beam.runners.core.metrics.MonitoringInfoMetricName;
import org.apache.beam.sdk.extensions.gcp.util.BackOffAdapter;
-import org.apache.beam.sdk.extensions.gcp.util.FastNanoClockAndSleeper;
import org.apache.beam.sdk.extensions.gcp.util.RetryHttpRequestInitializer;
import org.apache.beam.sdk.extensions.gcp.util.Transport;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl.DatasetServiceImpl;
@@ -102,6 +101,7 @@
import org.apache.beam.sdk.testing.ExpectedLogs;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.FastNanoClockAndSleeper;
import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.sdk.values.FailsafeValueInSingleWindow;
import org.apache.beam.sdk.values.ValueInSingleWindow;
@@ -248,7 +248,7 @@ public void testStartLoadJobSucceeds() throws IOException, InterruptedException
when(response.getContent()).thenReturn(toStream(testJob));
});
- Sleeper sleeper = new FastNanoClockAndSleeper();
+ Sleeper sleeper = new FastNanoClockAndSleeper()::sleep;
JobServiceImpl.startJob(
testJob,
new ApiErrorExtractor(),
@@ -277,7 +277,7 @@ public void testStartLoadJobSucceedsAlreadyExists() throws IOException, Interrup
when(response.getStatusCode()).thenReturn(409); // 409 means already exists
});
- Sleeper sleeper = new FastNanoClockAndSleeper();
+ Sleeper sleeper = new FastNanoClockAndSleeper()::sleep;
JobServiceImpl.startJob(
testJob,
new ApiErrorExtractor(),
@@ -312,7 +312,7 @@ public void testStartLoadJobRetry() throws IOException, InterruptedException {
when(response.getContent()).thenReturn(toStream(testJob));
});
- Sleeper sleeper = new FastNanoClockAndSleeper();
+ Sleeper sleeper = new FastNanoClockAndSleeper()::sleep;
JobServiceImpl.startJob(
testJob,
new ApiErrorExtractor(),