Skip to content

Commit

Permalink
Migrate GcsOptions#getExecutorService to an unbounded ScheduledExecut…
Browse files Browse the repository at this point in the history
…orService

This is a preliminary piece for issue #21368
  • Loading branch information
lukecwik committed Oct 8, 2022
1 parent 8216cba commit 686bdb9
Show file tree
Hide file tree
Showing 15 changed files with 1,099 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@
import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
import org.apache.beam.sdk.extensions.gcp.storage.NoopPathValidator;
import org.apache.beam.sdk.extensions.gcp.util.BackOffAdapter;
import org.apache.beam.sdk.extensions.gcp.util.FastNanoClockAndSleeper;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.ExpectedLogs;
import org.apache.beam.sdk.util.FastNanoClockAndSleeper;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.joda.time.Duration;
Expand Down Expand Up @@ -149,7 +149,8 @@ public void testWaitToFinishMessagesFail() throws Exception {
new DataflowPipelineJob(DataflowClient.create(options), JOB_ID, options, ImmutableMap.of());

State state =
job.waitUntilFinish(Duration.standardMinutes(5), jobHandler, fastClock, fastClock);
job.waitUntilFinish(
Duration.standardMinutes(5), jobHandler, fastClock::sleep, fastClock::nanoTime);
assertEquals(null, state);
}

Expand All @@ -169,7 +170,8 @@ public State mockWaitToFinishInState(State state) throws Exception {
DataflowPipelineJob job =
new DataflowPipelineJob(DataflowClient.create(options), JOB_ID, options, ImmutableMap.of());

return job.waitUntilFinish(Duration.standardMinutes(1), null, fastClock, fastClock);
return job.waitUntilFinish(
Duration.standardMinutes(1), null, fastClock::sleep, fastClock::nanoTime);
}

/**
Expand Down Expand Up @@ -251,7 +253,9 @@ public void testWaitToFinishFail() throws Exception {
new DataflowPipelineJob(DataflowClient.create(options), JOB_ID, options, ImmutableMap.of());

long startTime = fastClock.nanoTime();
State state = job.waitUntilFinish(Duration.standardMinutes(5), null, fastClock, fastClock);
State state =
job.waitUntilFinish(
Duration.standardMinutes(5), null, fastClock::sleep, fastClock::nanoTime);
assertEquals(null, state);
long timeDiff = TimeUnit.NANOSECONDS.toMillis(fastClock.nanoTime() - startTime);
checkValidInterval(
Expand All @@ -271,7 +275,8 @@ public void testWaitToFinishTimeFail() throws Exception {
DataflowPipelineJob job =
new DataflowPipelineJob(DataflowClient.create(options), JOB_ID, options, ImmutableMap.of());
long startTime = fastClock.nanoTime();
State state = job.waitUntilFinish(Duration.millis(4), null, fastClock, fastClock);
State state =
job.waitUntilFinish(Duration.millis(4), null, fastClock::sleep, fastClock::nanoTime);
assertEquals(null, state);
long timeDiff = TimeUnit.NANOSECONDS.toMillis(fastClock.nanoTime() - startTime);
// Should only have slept for the 4 ms allowed.
Expand Down Expand Up @@ -318,7 +323,7 @@ public void testGetStateReturnsServiceState() throws Exception {
State.RUNNING,
job.getStateWithRetriesOrUnknownOnException(
BackOffAdapter.toGcpBackOff(DataflowPipelineJob.STATUS_BACKOFF_FACTORY.backoff()),
fastClock));
fastClock::sleep));
}

@Test
Expand All @@ -335,7 +340,7 @@ public void testGetStateWithRetriesPassesExceptionThrough() throws Exception {
thrown.expect(IOException.class);
job.getStateWithRetries(
BackOffAdapter.toGcpBackOff(DataflowPipelineJob.STATUS_BACKOFF_FACTORY.backoff()),
fastClock);
fastClock::sleep);
}

@Test
Expand All @@ -354,7 +359,7 @@ public void testGetStateNoThrowWithExceptionReturnsUnknown() throws Exception {
State.UNKNOWN,
job.getStateWithRetriesOrUnknownOnException(
BackOffAdapter.toGcpBackOff(DataflowPipelineJob.STATUS_BACKOFF_FACTORY.backoff()),
fastClock));
fastClock::sleep));
long timeDiff = TimeUnit.NANOSECONDS.toMillis(fastClock.nanoTime() - startTime);
checkValidInterval(
DataflowPipelineJob.STATUS_POLLING_INTERVAL,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@
import org.apache.beam.runners.dataflow.util.PackageUtil.PackageAttributes;
import org.apache.beam.runners.dataflow.util.PackageUtil.StagedFile;
import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
import org.apache.beam.sdk.extensions.gcp.util.FastNanoClockAndSleeper;
import org.apache.beam.sdk.extensions.gcp.util.GcsUtil;
import org.apache.beam.sdk.extensions.gcp.util.GcsUtil.StorageObjectOrIOException;
import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath;
Expand All @@ -79,6 +78,7 @@
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.ExpectedLogs;
import org.apache.beam.sdk.testing.RegexMatcher;
import org.apache.beam.sdk.util.FastNanoClockAndSleeper;
import org.apache.beam.sdk.util.MimeTypes;
import org.apache.beam.sdk.util.ZipFiles;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -413,7 +413,7 @@ public void testPackageUploadFailsWhenIOExceptionThrown() throws Exception {
directPackageUtil.stageClasspathElements(
ImmutableList.of(makeStagedFile(tmpFile.getAbsolutePath())),
STAGING_PATH,
fastNanoClockAndSleeper,
fastNanoClockAndSleeper::sleep,
createOptions);
} finally {
verify(mockGcsUtil).getObjects(anyListOf(GcsPath.class));
Expand Down Expand Up @@ -441,7 +441,7 @@ public void testPackageUploadFailsWithPermissionsErrorGivesDetailedMessage() thr
directPackageUtil.stageClasspathElements(
ImmutableList.of(makeStagedFile(tmpFile.getAbsolutePath())),
STAGING_PATH,
fastNanoClockAndSleeper,
fastNanoClockAndSleeper::sleep,
createOptions);
fail("Expected RuntimeException");
} catch (RuntimeException e) {
Expand Down Expand Up @@ -484,7 +484,7 @@ public void testPackageUploadEventuallySucceeds() throws Exception {
directPackageUtil.stageClasspathElements(
ImmutableList.of(makeStagedFile(tmpFile.getAbsolutePath())),
STAGING_PATH,
fastNanoClockAndSleeper,
fastNanoClockAndSleeper::sleep,
createOptions);
} finally {
verify(mockGcsUtil).getObjects(anyListOf(GcsPath.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@
import java.util.ArrayList;
import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
import org.apache.beam.runners.dataflow.util.TimeUtil;
import org.apache.beam.sdk.extensions.gcp.util.FastNanoClockAndSleeper;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.util.FastNanoClockAndSleeper;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Optional;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.hamcrest.Description;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@
import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
import org.apache.beam.runners.dataflow.worker.testing.RestoreDataflowLoggingMDC;
import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
import org.apache.beam.sdk.extensions.gcp.util.FastNanoClockAndSleeper;
import org.apache.beam.sdk.extensions.gcp.util.Transport;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.RestoreSystemProperties;
import org.apache.beam.sdk.util.FastNanoClockAndSleeper;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@
import org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingMDC;
import org.apache.beam.runners.dataflow.worker.testing.RestoreDataflowLoggingMDC;
import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
import org.apache.beam.sdk.extensions.gcp.util.FastNanoClockAndSleeper;
import org.apache.beam.sdk.extensions.gcp.util.Transport;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.RestoreSystemProperties;
import org.apache.beam.sdk.util.FastNanoClockAndSleeper;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Optional;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.util;

/**
* Nano clock which can be used to measure elapsed time in nanoseconds.
*
* <p>The default system implementation can be accessed at {@link #SYSTEM}. Alternative
* implementations may be used for testing.
*/
@FunctionalInterface
interface NanoClock {

/**
* Returns the current value of the most precise available system timer, in nanoseconds for use to
* measure elapsed time, to match the behavior of {@link System#nanoTime()}.
*/
long nanoTime();

/**
* Provides the default System implementation of a nano clock by using {@link System#nanoTime()}.
*/
NanoClock SYSTEM = System::nanoTime;
}
Loading

0 comments on commit 686bdb9

Please sign in to comment.