From 3726b9798c2fcc120ecb6715cc8864de2b6707af Mon Sep 17 00:00:00 2001 From: idv Date: Mon, 13 May 2019 17:26:08 -0700 Subject: [PATCH] Add property to parallelize GCS requests in `listStatus` and `getFileStatus` methods Note: this is essentially the same change as in [] that triggered omg/12873 in the past, but it has feature flag that turns off it by default and tests that assert number of GCS requests when parallelism is enabled. In the worst case `getFileStatus` method can make up to 3 sequential requests to GCS to get implicit directory status. After moving implicit directory repair from list to delete/rename operations this worst case could be more frequent than before, because there higher chance to encounter implicit non-repaired directory: https://github.com/GoogleCloudPlatform/bigdata-interop/pull/156 This CL adds an option to execute these GCS requests in parallel which could reduce latency by up to 3 times. Change on 2019/05/13 by idv ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=248044068 --- gcs/CHANGES.md | 9 + .../GoogleHadoopFileSystemConfiguration.java | 11 +- .../gcsio/GoogleCloudStorageFileSystem.java | 37 +- .../GoogleCloudStorageFileSystemOptions.java | 24 +- ...udStorageFileSystemNewIntegrationTest.java | 230 ++++++- .../GoogleCloudStorageIntegrationHelper.java | 6 + .../gcsio/TrackingHttpRequestInitializer.java | 22 +- .../hadoop/util/LazyExecutorService.java | 592 ++++++++++++++++++ 8 files changed, 906 insertions(+), 25 deletions(-) create mode 100644 util/src/main/java/com/google/cloud/hadoop/util/LazyExecutorService.java diff --git a/gcs/CHANGES.md b/gcs/CHANGES.md index 35337c785d..1af928d7a5 100644 --- a/gcs/CHANGES.md +++ b/gcs/CHANGES.md @@ -34,6 +34,15 @@ 1. Remove obsolete `fs.gs.performance.cache.dir.metadata.prefetch.limit` property. +1. Add a property to parallelize GCS requests in `getFileStatus` and + `listStatus` methods to reduce latency: + + fs.gs.status.parallel.enable (default: false) + + Setting this property to `true` will cause GCS connector to send more GCS + requests which will decrease latency but also increase cost of + `getFileStatus` and `listStatus` method calls. + ### 1.9.14 - 2019-02-13 1. Implement Hadoop File System `concat` method using GCS compose API. diff --git a/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFileSystemConfiguration.java b/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFileSystemConfiguration.java index 9a8387c85e..6f16564b76 100644 --- a/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFileSystemConfiguration.java +++ b/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFileSystemConfiguration.java @@ -203,6 +203,14 @@ public class GoogleHadoopFileSystemConfiguration { "fs.gs.performance.cache.list.caching.enable", PerformanceCachingGoogleCloudStorageOptions.LIST_CACHING_ENABLED); + /** + * If true, executes GCS requests in {@code listStatus} and {@code getFileStatus} methods in + * parallel to reduce latency. + */ + public static final GoogleHadoopFileSystemConfigurationProperty + GCS_STATUS_PARALLEL_ENABLE = + new GoogleHadoopFileSystemConfigurationProperty<>("fs.gs.status.parallel.enable", false); + /** * Configuration key for whether or not we should update timestamps for parent directories when we * create new files in them. @@ -498,7 +506,8 @@ static GoogleCloudStorageFileSystemOptions.Builder getGcsFsOptionsBuilder(Config .setMarkerFilePattern(GCS_MARKER_FILE_PATTERN.get(config, config::get)) .setIsPerformanceCacheEnabled( GCS_PERFORMANCE_CACHE_ENABLE.get(config, config::getBoolean)) - .setImmutablePerformanceCachingOptions(getPerformanceCachingOptions(config)); + .setImmutablePerformanceCachingOptions(getPerformanceCachingOptions(config)) + .setStatusParallelEnabled(GCS_STATUS_PARALLEL_ENABLE.get(config, config::getBoolean)); String projectId = GCS_PROJECT_ID.get(config, config::get); gcsFsOptionsBuilder diff --git a/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageFileSystem.java b/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageFileSystem.java index 6f6af1890d..635a49ec89 100644 --- a/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageFileSystem.java +++ b/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageFileSystem.java @@ -29,6 +29,7 @@ import com.google.api.client.util.Clock; import com.google.cloud.hadoop.gcsio.GoogleCloudStorage.ListPage; import com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystemOptions.TimestampUpdatePredicate; +import com.google.cloud.hadoop.util.LazyExecutorService; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.CharMatcher; import com.google.common.base.Preconditions; @@ -1055,11 +1056,20 @@ public List listFileInfo(URI path) throws IOException { StorageResourceId dirId = pathCodec.validatePathAndGetId(FileInfo.convertToDirectoryPath(pathCodec, path), true); - // To improve performance start to list directory items right away. - ExecutorService dirExecutor = Executors.newSingleThreadExecutor(DAEMON_THREAD_FACTORY); + ExecutorService dirExecutor = + options.isStatusParallelEnabled() + ? Executors.newFixedThreadPool(2, DAEMON_THREAD_FACTORY) + : new LazyExecutorService(); try { Future dirFuture = dirExecutor.submit(() -> gcs.getItemInfo(dirId)); + Future> dirChildrenFutures = + dirExecutor.submit( + () -> + dirId.isRoot() + ? gcs.listBucketInfo() + : gcs.listObjectInfo( + dirId.getBucketName(), dirId.getObjectName(), PATH_DELIMITER)); dirExecutor.shutdown(); if (!pathId.isDirectory()) { @@ -1073,10 +1083,7 @@ public List listFileInfo(URI path) throws IOException { try { GoogleCloudStorageItemInfo dirInfo = dirFuture.get(); - List dirItemInfos = - dirId.isRoot() - ? gcs.listBucketInfo() - : gcs.listObjectInfo(dirId.getBucketName(), dirId.getObjectName(), PATH_DELIMITER); + List dirItemInfos = dirChildrenFutures.get(); if (!dirInfo.exists() && dirItemInfos.isEmpty()) { throw new FileNotFoundException("Item not found: " + path); } @@ -1124,9 +1131,19 @@ private GoogleCloudStorageItemInfo getFileInfoInternal( return gcs.getItemInfo(resourceId); } StorageResourceId dirId = FileInfo.convertToDirectoryPath(resourceId); - // To improve performance get directory and its child right away. - ExecutorService dirExecutor = Executors.newSingleThreadExecutor(DAEMON_THREAD_FACTORY); + + ExecutorService dirExecutor = + options.isStatusParallelEnabled() + ? resourceId.isDirectory() + ? Executors.newSingleThreadExecutor(DAEMON_THREAD_FACTORY) + : Executors.newFixedThreadPool(2, DAEMON_THREAD_FACTORY) + : new LazyExecutorService(); try { + Future> dirChildFuture = + dirExecutor.submit( + () -> + gcs.listObjectNames( + dirId.getBucketName(), dirId.getObjectName(), PATH_DELIMITER, 1)); Future dirFuture = resourceId.isDirectory() ? Futures.immediateFuture(gcs.getItemInfo(resourceId)) @@ -1146,9 +1163,7 @@ private GoogleCloudStorageItemInfo getFileInfoInternal( return dirInfo; } - List dirChild = - gcs.listObjectNames(dirId.getBucketName(), dirId.getObjectName(), PATH_DELIMITER, 1); - if (dirChild.isEmpty()) { + if (dirChildFuture.get().isEmpty()) { return GoogleCloudStorageItemInfo.createNotFound(resourceId); } diff --git a/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageFileSystemOptions.java b/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageFileSystemOptions.java index 9dd3106ba8..76b9fe8873 100644 --- a/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageFileSystemOptions.java +++ b/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageFileSystemOptions.java @@ -64,6 +64,7 @@ public boolean shouldUpdateTimestamp(URI item) { private PathCodec pathCodec = GoogleCloudStorageFileSystem.URI_ENCODED_PATH_CODEC; private boolean enableBucketDelete = false; private String markerFilePattern = null; + private boolean statusParallelEnabled = false; public Builder setIsPerformanceCacheEnabled(boolean performanceCacheEnabled) { this.performanceCacheEnabled = performanceCacheEnabled; @@ -180,6 +181,15 @@ public Builder setMarkerFilePattern(String markerFilePattern) { return this; } + /** + * Enables parallel execution of GCS requests in {@code listFileInfo} and {@code getFileInfo} + * methods to reduce latency. + */ + public Builder setStatusParallelEnabled(boolean statusParallelEnabled) { + this.statusParallelEnabled = statusParallelEnabled; + return this; + } + public GoogleCloudStorageFileSystemOptions build() { return new GoogleCloudStorageFileSystemOptions( immutablePerformanceCacheOptions != null @@ -192,7 +202,8 @@ public GoogleCloudStorageFileSystemOptions build() { shouldIncludeInTimestampUpdatesPredicate, pathCodec, enableBucketDelete, - markerFilePattern); + markerFilePattern, + statusParallelEnabled); } } @@ -207,6 +218,7 @@ public static Builder newBuilder() { private final PathCodec pathCodec; private final boolean enableBucketDelete; private final Pattern markerFilePattern; + private final boolean statusParallelEnabled; public GoogleCloudStorageFileSystemOptions( PerformanceCachingGoogleCloudStorageOptions performanceCacheOptions, @@ -215,7 +227,8 @@ public GoogleCloudStorageFileSystemOptions( TimestampUpdatePredicate shouldIncludeInTimestampUpdatesPredicate, PathCodec pathCodec, boolean enableBucketDelete, - String markerFilePattern) { + String markerFilePattern, + boolean statusParallelEnabled) { this.performanceCacheOptions = performanceCacheOptions; this.performanceCacheEnabled = performanceCacheEnabled; this.cloudStorageOptions = cloudStorageOptions; @@ -223,6 +236,7 @@ public GoogleCloudStorageFileSystemOptions( this.pathCodec = pathCodec; this.enableBucketDelete = enableBucketDelete; this.markerFilePattern = Pattern.compile("^(.+/)?" + markerFilePattern + "$"); + this.statusParallelEnabled = statusParallelEnabled; } public PerformanceCachingGoogleCloudStorageOptions getPerformanceCacheOptions() { @@ -253,11 +267,15 @@ public Pattern getMarkerFilePattern() { return markerFilePattern; } + public boolean isStatusParallelEnabled() { + return statusParallelEnabled; + } + public void throwIfNotValid() { checkNotNull( shouldIncludeInTimestampUpdatesPredicate, "Predicate for ignored directory updates should not be null." - + " Consider Predicates.alwasyTrue"); + + " Consider Predicates.alwaysTrue"); cloudStorageOptions.throwIfNotValid(); } } diff --git a/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageFileSystemNewIntegrationTest.java b/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageFileSystemNewIntegrationTest.java index b1e576f941..737e3ecce8 100644 --- a/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageFileSystemNewIntegrationTest.java +++ b/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageFileSystemNewIntegrationTest.java @@ -16,9 +16,12 @@ package com.google.cloud.hadoop.gcsio; +import static com.google.cloud.hadoop.gcsio.TrackingHttpRequestInitializer.getRequestString; +import static com.google.cloud.hadoop.gcsio.TrackingHttpRequestInitializer.listRequestString; import static com.google.cloud.hadoop.gcsio.TrackingHttpRequestInitializer.uploadRequestString; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.truth.Truth.assertThat; +import static java.nio.charset.StandardCharsets.UTF_8; import com.google.api.client.auth.oauth2.Credential; import com.google.cloud.hadoop.gcsio.integration.GoogleCloudStorageTestHelper; @@ -26,6 +29,8 @@ import com.google.cloud.hadoop.util.RetryHttpInitializer; import java.io.IOException; import java.net.URI; +import java.net.URLEncoder; +import java.util.List; import java.util.UUID; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -81,11 +86,9 @@ public static void afterClass() throws Throwable { @Test public void mkdirs_shouldCreateNewDirectory() throws Exception { - GoogleCloudStorageFileSystemOptions gcsFsOptions = newGcsFsOptions(); - TrackingHttpRequestInitializer gcsRequestsTracker = new TrackingHttpRequestInitializer(httpRequestsInitializer); - GoogleCloudStorageFileSystem gcsFs = newGcsFs(gcsFsOptions, gcsRequestsTracker); + GoogleCloudStorageFileSystem gcsFs = newGcsFs(newGcsFsOptions().build(), gcsRequestsTracker); String bucketName = gcsfsIHelper.sharedBucketName1; String dirObject = "mkdirs_shouldCreateNewDirectory_" + UUID.randomUUID(); @@ -94,16 +97,229 @@ public void mkdirs_shouldCreateNewDirectory() throws Exception { gcsFs.mkdir(dirObjectUri); assertThat(gcsRequestsTracker.getAllRequestStrings()) - .containsExactly(uploadRequestString(bucketName, dirObject)); + .containsExactly(uploadRequestString(bucketName, dirObject + "/")); assertThat(gcsFs.exists(dirObjectUri)).isTrue(); assertThat(gcsFs.getFileInfo(dirObjectUri).isDirectory()).isTrue(); } - private GoogleCloudStorageFileSystemOptions newGcsFsOptions() { + @Test + public void getFileInfo_sequential() throws Exception { + TrackingHttpRequestInitializer gcsRequestsTracker = + new TrackingHttpRequestInitializer(httpRequestsInitializer); + GoogleCloudStorageFileSystem gcsFs = newGcsFs(newGcsFsOptions().build(), gcsRequestsTracker); + + String bucketName = gcsfsIHelper.sharedBucketName1; + String dirObject = "getFileInfo_sequential_" + UUID.randomUUID(); + URI dirObjectUri = new URI("gs://" + bucketName).resolve("/" + dirObject); + + gcsfsIHelper.createObjectsWithSubdirs( + bucketName, dirObject + "/file1", dirObject + "/file2", dirObject + "/file3"); + + FileInfo dirInfo = gcsFs.getFileInfo(dirObjectUri); + + assertThat(gcsRequestsTracker.getAllRequestStrings()) + .containsExactly( + getRequestString(bucketName, dirObject), + getRequestString(bucketName, dirObject + URLEncoder.encode("/", UTF_8.name()))); + + assertThat(dirInfo.exists()).isTrue(); + assertThat(dirInfo.getPath().toString()).isEqualTo(dirObjectUri + "/"); + } + + @Test + public void getFileInfo_parallel() throws Exception { + GoogleCloudStorageFileSystemOptions gcsFsOptions = + newGcsFsOptions().setStatusParallelEnabled(true).build(); + + TrackingHttpRequestInitializer gcsRequestsTracker = + new TrackingHttpRequestInitializer(httpRequestsInitializer); + GoogleCloudStorageFileSystem gcsFs = newGcsFs(gcsFsOptions, gcsRequestsTracker); + + String bucketName = gcsfsIHelper.sharedBucketName1; + String dirObject = "getFileInfo_parallel_" + UUID.randomUUID(); + URI dirObjectUri = new URI("gs://" + bucketName).resolve("/" + dirObject); + + gcsfsIHelper.createObjectsWithSubdirs( + bucketName, dirObject + "/file1", dirObject + "/file2", dirObject + "/file3"); + + FileInfo dirInfo = gcsFs.getFileInfo(dirObjectUri); + + assertThat(gcsRequestsTracker.getAllRequestStrings()) + .containsExactly( + getRequestString(bucketName, dirObject), + getRequestString(bucketName, dirObject + URLEncoder.encode("/", UTF_8.name())), + listRequestString( + bucketName, dirObject + "/", /* maxResults= */ 2, /* pageToken= */ null)); + + assertThat(dirInfo.exists()).isTrue(); + assertThat(dirInfo.getPath().toString()).isEqualTo(dirObjectUri + "/"); + } + + @Test + public void getDirInfo_sequential() throws Exception { + TrackingHttpRequestInitializer gcsRequestsTracker = + new TrackingHttpRequestInitializer(httpRequestsInitializer); + GoogleCloudStorageFileSystem gcsFs = newGcsFs(newGcsFsOptions().build(), gcsRequestsTracker); + + String bucketName = gcsfsIHelper.sharedBucketName1; + String dirObject = "getDirInfo_sequential_" + UUID.randomUUID(); + URI dirObjectUri = new URI("gs://" + bucketName).resolve("/" + dirObject); + + gcsfsIHelper.createObjectsWithSubdirs( + bucketName, dirObject + "/file1", dirObject + "/file2", dirObject + "/file3"); + + FileInfo dirInfo = gcsFs.getFileInfo(new URI("gs://" + bucketName).resolve(dirObject + "/")); + + assertThat(gcsRequestsTracker.getAllRequestStrings()) + .containsExactly( + getRequestString(bucketName, dirObject + URLEncoder.encode("/", UTF_8.name()))); + + assertThat(dirInfo.exists()).isTrue(); + assertThat(dirInfo.getPath().toString()).isEqualTo(dirObjectUri + "/"); + } + + @Test + public void getDirInfo_parallel() throws Exception { + GoogleCloudStorageFileSystemOptions gcsFsOptions = + newGcsFsOptions().setStatusParallelEnabled(true).build(); + + TrackingHttpRequestInitializer gcsRequestsTracker = + new TrackingHttpRequestInitializer(httpRequestsInitializer); + GoogleCloudStorageFileSystem gcsFs = newGcsFs(gcsFsOptions, gcsRequestsTracker); + + String bucketName = gcsfsIHelper.sharedBucketName1; + String dirObject = "getDirInfo_parallel_" + UUID.randomUUID(); + URI dirObjectUri = new URI("gs://" + bucketName).resolve("/" + dirObject); + + gcsfsIHelper.createObjectsWithSubdirs( + bucketName, dirObject + "/file1", dirObject + "/file2", dirObject + "/file3"); + + FileInfo dirInfo = gcsFs.getFileInfo(new URI("gs://" + bucketName).resolve(dirObject + "/")); + + assertThat(gcsRequestsTracker.getAllRequestStrings()) + .containsExactly( + getRequestString(bucketName, dirObject + URLEncoder.encode("/", UTF_8.name())), + listRequestString( + bucketName, dirObject + "/", /* maxResults= */ 2, /* pageToken= */ null)); + + assertThat(dirInfo.exists()).isTrue(); + assertThat(dirInfo.getPath().toString()).isEqualTo(dirObjectUri + "/"); + } + + @Test + public void listFileInfo_file_sequential() throws Exception { + TrackingHttpRequestInitializer gcsRequestsTracker = + new TrackingHttpRequestInitializer(httpRequestsInitializer); + GoogleCloudStorageFileSystem gcsFs = newGcsFs(newGcsFsOptions().build(), gcsRequestsTracker); + + String bucketName = gcsfsIHelper.sharedBucketName1; + String fileObject = "listFileInfo_file_sequential_" + UUID.randomUUID(); + URI fileObjectUri = new URI("gs://" + bucketName).resolve(fileObject); + + gcsfsIHelper.createObjectsWithSubdirs(bucketName, fileObject); + + List fileInfos = gcsFs.listFileInfo(fileObjectUri); + + assertThat(gcsRequestsTracker.getAllRequestStrings()) + .containsExactly(getRequestString(bucketName, fileObject)); + + assertThat(fileInfos).hasSize(1); + } + + @Test + public void listFileInfo_file_parallel() throws Exception { + GoogleCloudStorageFileSystemOptions gcsFsOptions = + newGcsFsOptions().setStatusParallelEnabled(true).build(); + + TrackingHttpRequestInitializer gcsRequestsTracker = + new TrackingHttpRequestInitializer(httpRequestsInitializer); + GoogleCloudStorageFileSystem gcsFs = newGcsFs(gcsFsOptions, gcsRequestsTracker); + + String bucketName = gcsfsIHelper.sharedBucketName1; + String fileObject = "listFileInfo_file_parallel_" + UUID.randomUUID(); + URI fileObjectUri = new URI("gs://" + bucketName).resolve(fileObject); + + gcsfsIHelper.createObjectsWithSubdirs(bucketName, fileObject); + + List fileInfos = gcsFs.listFileInfo(fileObjectUri); + + assertThat(gcsRequestsTracker.getAllRequestStrings()) + .containsExactly( + getRequestString(bucketName, fileObject), + getRequestString(bucketName, fileObject + URLEncoder.encode("/", UTF_8.name())), + listRequestString( + bucketName, + /* includeTrailingDelimiter= */ true, + fileObject + "/", + /* maxResults= */ 1024, + /* pageToken= */ null)); + + assertThat(fileInfos).hasSize(1); + } + + @Test + public void listFileInfo_directory_sequential() throws Exception { + TrackingHttpRequestInitializer gcsRequestsTracker = + new TrackingHttpRequestInitializer(httpRequestsInitializer); + GoogleCloudStorageFileSystem gcsFs = newGcsFs(newGcsFsOptions().build(), gcsRequestsTracker); + + String bucketName = gcsfsIHelper.sharedBucketName1; + String dirObject = "listFileInfo_directory_sequential_" + UUID.randomUUID(); + URI dirObjectUri = new URI("gs://" + bucketName).resolve(dirObject); + + gcsfsIHelper.createObjectsWithSubdirs(bucketName, dirObject + "/file1", dirObject + "/file2"); + + List fileInfos = gcsFs.listFileInfo(dirObjectUri); + + assertThat(gcsRequestsTracker.getAllRequestStrings()) + .containsExactly( + getRequestString(bucketName, dirObject), + getRequestString(bucketName, dirObject + URLEncoder.encode("/", UTF_8.name())), + listRequestString( + bucketName, + /* includeTrailingDelimiter= */ true, + dirObject + "/", + /* maxResults= */ 1024, + /* pageToken= */ null)); + + assertThat(fileInfos).hasSize(2); + } + + @Test + public void listFileInfo_directory_parallel() throws Exception { + GoogleCloudStorageFileSystemOptions gcsFsOptions = + newGcsFsOptions().setStatusParallelEnabled(true).build(); + + TrackingHttpRequestInitializer gcsRequestsTracker = + new TrackingHttpRequestInitializer(httpRequestsInitializer); + GoogleCloudStorageFileSystem gcsFs = newGcsFs(gcsFsOptions, gcsRequestsTracker); + + String bucketName = gcsfsIHelper.sharedBucketName1; + String dirObject = "listFileInfo_directory_parallel_" + UUID.randomUUID(); + URI dirObjectUri = new URI("gs://" + bucketName).resolve(dirObject); + + gcsfsIHelper.createObjectsWithSubdirs(bucketName, dirObject + "/file1", dirObject + "/file2"); + + List fileInfos = gcsFs.listFileInfo(dirObjectUri); + + assertThat(gcsRequestsTracker.getAllRequestStrings()) + .containsExactly( + getRequestString(bucketName, dirObject), + getRequestString(bucketName, dirObject + URLEncoder.encode("/", UTF_8.name())), + listRequestString( + bucketName, + /* includeTrailingDelimiter= */ true, + dirObject + "/", + /* maxResults= */ 1024, + /* pageToken= */ null)); + + assertThat(fileInfos).hasSize(2); + } + + private GoogleCloudStorageFileSystemOptions.Builder newGcsFsOptions() { return GoogleCloudStorageFileSystemOptions.newBuilder() - .setCloudStorageOptionsBuilder(gcsOptions.toBuilder()) - .build(); + .setCloudStorageOptionsBuilder(gcsOptions.toBuilder()); } private GoogleCloudStorageFileSystem newGcsFs( diff --git a/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageIntegrationHelper.java b/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageIntegrationHelper.java index 0a8e8afb67..50c4e52e12 100644 --- a/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageIntegrationHelper.java +++ b/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageIntegrationHelper.java @@ -14,6 +14,8 @@ package com.google.cloud.hadoop.gcsio; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Strings.isNullOrEmpty; import static com.google.common.truth.Truth.assertWithMessage; import static java.nio.charset.StandardCharsets.UTF_8; @@ -335,6 +337,10 @@ public void createObjectsWithSubdirs(String bucketName, String... objectNames) * */ private List getSubdirs(String objectName) { + checkArgument( + isNullOrEmpty(objectName) || objectName.charAt(0) != '/', + "objectName can not start from '/': %s", + objectName); List subdirs = new ArrayList<>(); // Create a list of all subdirs. // for example, diff --git a/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/TrackingHttpRequestInitializer.java b/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/TrackingHttpRequestInitializer.java index 807c18b310..6a3f17e03b 100644 --- a/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/TrackingHttpRequestInitializer.java +++ b/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/TrackingHttpRequestInitializer.java @@ -31,6 +31,9 @@ public class TrackingHttpRequestInitializer implements HttpRequestInitializer { + private static final String GET_REQUEST_FORMAT = + "GET:https://www.googleapis.com/storage/v1/b/%s/o/%s"; + private static final String UPLOAD_REQUEST_FORMAT = "POST:https://www.googleapis.com/upload/storage/v1/b/%s/o?uploadType=multipart:%s"; @@ -80,13 +83,26 @@ public void reset() { requests.clear(); } - public static String uploadRequestString(String bucketName, String dirObject) { - return String.format(UPLOAD_REQUEST_FORMAT, bucketName, dirObject + "/"); + public static String getRequestString(String bucketName, String object) { + return String.format(GET_REQUEST_FORMAT, bucketName, object); + } + + public static String uploadRequestString(String bucketName, String object) { + return String.format(UPLOAD_REQUEST_FORMAT, bucketName, object); } public static String listRequestString( String bucket, String prefix, int maxResults, String pageToken) { - boolean includeTrailingDelimiter = false; + return listRequestString( + bucket, /* includeTrailingDelimiter= */ false, prefix, maxResults, pageToken); + } + + public static String listRequestString( + String bucket, + boolean includeTrailingDelimiter, + String prefix, + int maxResults, + String pageToken) { String pageTokenParam = pageToken == null ? "" : "&pageToken=" + pageToken; return String.format( LIST_REQUEST_FORMAT, bucket, includeTrailingDelimiter, maxResults, pageTokenParam, prefix); diff --git a/util/src/main/java/com/google/cloud/hadoop/util/LazyExecutorService.java b/util/src/main/java/com/google/cloud/hadoop/util/LazyExecutorService.java new file mode 100644 index 0000000000..5450836390 --- /dev/null +++ b/util/src/main/java/com/google/cloud/hadoop/util/LazyExecutorService.java @@ -0,0 +1,592 @@ +// Copyright 2007 Google Inc. All Rights Reserved. + +package com.google.cloud.hadoop.util; + +import static com.google.common.base.Preconditions.checkNotNull; +import static java.util.concurrent.TimeUnit.NANOSECONDS; + +import com.google.common.annotations.GwtIncompatible; +import com.google.common.collect.Lists; +import com.google.common.collect.MapMaker; +import com.google.common.util.concurrent.ForwardingFuture; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.errorprone.annotations.CanIgnoreReturnValue; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantLock; +import javax.annotation.Nullable; + +/** + * Defers execution to the time that a method that expresses interest in the result (get or isDone) + * is called on the Future. Execution is performed by a backing ExecutorService. + * + *

In essence, a returned Future represents a "canned" method call and once the call has been + * performed, the Future returns the cached result. + * + *

Both this class and the returned Future are thread-safe. + * + * @author tobe@google.com (Torbjorn Gannholm) + * @author jlevy@google.com (Jared Levy) + * @author cpovirk@google.com (Chris Povirk) + */ +@GwtIncompatible +public final class LazyExecutorService implements ExecutorService { + private volatile boolean shutdown = false; + private final ExecutorService backingService; + private final CountDownLatch terminated = new CountDownLatch(1); + + /** + * Creates an instance using a {@link MoreExecutors#newDirectExecutorService()} for the backing + * service. + */ + public LazyExecutorService() { + backingService = MoreExecutors.newDirectExecutorService(); + } + + /** + * Creates an instance using the given {@code ExecutorService} as the backing service. + * + *

The backing service will only be used to execute tasks and it may be shared by several + * instances or used for other purposes. Shutdowns of this instance will not shut down the backing + * service. + * + *

If you shut down the backing service, this instance will be shut down automatically and all + * tasks submitted to this instance that have not yet been submitted to the backing service will + * be considered cancelled. + */ + public LazyExecutorService(ExecutorService backingService) { + this.backingService = backingService; + } + + /** + * A set of all submitted uncompleted tasks so that we can cancel them on {@code shutdownNow()}. + * The tasks need to be wrapped in weak references so that tasks that are just dropped can be + * gc:ed. The set needs to be safe for concurrent access. + */ + private final Set> pendingTasks = + Collections.newSetFromMap(new MapMaker().weakKeys()., Boolean>makeMap()); + + /** + * Manages compound conditions involving changing the size of {@code pendingTasks} and the value + * of {@code shutdown}. + */ + private final ReentrantLock tasksAndTerminationLock = new ReentrantLock(); + + /** + * Should be called when a task is completed or cancelled. + * + * @param f The completed or cancelled task to remove. + */ + private void removePendingTask(ExecutingFutureImpl f) { + pendingTasks.remove(f); + updateTerminationState(); + } + + private void updateTerminationState() { + tasksAndTerminationLock.lock(); + try { + if (shutdown && pendingTasks.isEmpty()) { + terminated.countDown(); + } + } finally { + tasksAndTerminationLock.unlock(); + } + } + + /** Shuts this service down, but leaves the backing service untouched. */ + @Override + public void shutdown() { + shutdown = true; + updateTerminationState(); + } + + /** + * Trying to interpret the assumptions about the contract of this method in the light of this + * implementation, it seems most reasonable to take the view that all tasks are running, even if + * the processing has not actually started. Therefore, unfinished tasks will be cancelled and an + * empty list will be returned. + */ + @CanIgnoreReturnValue + @Override + public List shutdownNow() { + shutdown(); + // Cancel all unfinished tasks. + // Get a snapshot because future.cancel modifies pendingTasks. + Future[] runningTasks = pendingTasks.toArray(new Future[0]); + for (Future future : runningTasks) { + // Cancel may not succeed, but it's best effort. + future.cancel(true); + } + return Lists.newLinkedList(); + } + + @Override + public boolean isShutdown() { + checkBackingService(); + return shutdown; + } + + /** + * Checks if this service has been implicitly shut down through a shutdown on the backing service + * and make the state reflect that. + */ + private void checkBackingService() { + if (backingService.isShutdown()) { + // This service is logically also shut down. + shutdown(); + // Notify the unfinished Futures. + ExecutingFuture[] runningTasks = pendingTasks.toArray(new ExecutingFuture[0]); + for (ExecutingFuture future : runningTasks) { + future.backingServiceDied(); + } + } + } + + @Override + public boolean isTerminated() { + return isShutdown() && terminated.getCount() == 0; + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + if (isTerminated()) { + return true; + } + return terminated.await(timeout, unit); + } + + @Override + public Future submit(final Callable task) { + checkNotNull(task, "Null task submitted."); + tasksAndTerminationLock.lock(); + try { + if (isShutdown()) { + throw new RejectedExecutionException("ExecutorService is shutdown"); + } + ExecutingFuture future = new ExecutingFutureImpl(task); + pendingTasks.add(future); + return future; + } finally { + tasksAndTerminationLock.unlock(); + } + } + + @Override + public Future submit(Runnable task, T result) { + return submit(Executors.callable(task, result)); + } + + @Override + public Future submit(Runnable command) { + return submit(Executors.callable(command)); + } + + /** + * ExecutorService requires that this method should not return until all tasks are completed, + * which precludes lazy execution. Tasks are run in parallel, as far as the backing service + * allows. + * + *

This method makes sense from a cached result perspective but not from a lazy execution + * perspective. + */ + @CanIgnoreReturnValue + @Override + public List> invokeAll(Collection> tasks) + throws InterruptedException { + List> result = Lists.newLinkedList(); + try { + for (Callable task : tasks) { + result.add(submit(task)); + } + List> monitorTasks = createMonitorTasksFor(result, 0, null); + backingService.invokeAll(monitorTasks); + } finally { + // Clean up. These are no-ops for completed tasks. + for (Future future : result) { + future.cancel(true); + } + } + return result; + } + + /** + * ExecutorService requires that this method should not return until all tasks are completed or + * the timeout expires, which precludes lazy execution. Tasks are run in parallel, as far as the + * backing service allows. Timeout is done as a best-effort in case of the default same thread + * executor. + * + *

This method makes sense from a cached result perspective but not from a lazy execution + * perspective. + */ + @CanIgnoreReturnValue + @Override + public List> invokeAll( + Collection> tasks, final long timeout, final TimeUnit unit) + throws InterruptedException { + checkNotNull(unit); + final List> result = Lists.newLinkedList(); + try { + for (Callable task : tasks) { + result.add(submit(task)); + } + List> monitorTasks = createMonitorTasksFor(result, timeout, unit); + backingService.invokeAll(monitorTasks, timeout, unit); + } finally { + // Clean up, even when interrupted. These are no-ops on normal exit. + for (Future future : result) { + future.cancel(true); + } + } + return result; + } + + private static List> createMonitorTasksFor( + List> futures, long timeout, @Nullable TimeUnit unit) { + List> monitorTasks = Lists.newLinkedList(); + // A null unit means 0 means "unbounded." + long deadline = unit == null ? 0 : System.nanoTime() + NANOSECONDS.convert(timeout, unit); + // We need to add tasks for both starting and checking completion. + // In the case of a direct executor, the starting tasks will be slow + // and actually perform the task, while the checks are instant. + // In the case of a ThreadPoolExecutor, the start tasks are instant and + // the checks await completion. + // We want to add all the start tasks before the completion check tasks. + // TODO(user): This assumes tasks are executed in order. Verify or fix. + for (Future future : futures) { + monitorTasks.add(new StartExecutionTask(future)); + } + for (Future future : futures) { + monitorTasks.add(new CompletionCheckTask(future, deadline)); + } + return monitorTasks; + } + + private static class StartExecutionTask implements Callable { + private final Future future; + + StartExecutionTask(Future future) { + this.future = future; + } + + @Override + public Void call() { + future.isDone(); + return null; + } + } + + private static class CompletionCheckTask implements Callable { + private final Future future; + private final long deadline; + + CompletionCheckTask(Future future, long deadline) { + this.future = future; + this.deadline = deadline; + } + + @Override + public Void call() { + try { + if (deadline == 0) { + future.get(); + } else { + /* The timeout here is just a safeguard. The timing is really done + * in the invoking code. */ + future.get(deadline - System.nanoTime(), NANOSECONDS); + } + } catch (ExecutionException e) { + // We don't care at this point. + } catch (InterruptedException e) { + // Propagate the interrupt. + Thread.currentThread().interrupt(); + // Interrupt execution + future.cancel(true); + } catch (TimeoutException e) { + // Interrupt execution + future.cancel(true); + } + return null; + } + } + + /** + * Always throws a RejectedExecutionException because using this method does not make sense from + * either a lazy execution perspective or a cached result perspective. + */ + @Override + public T invokeAny(Collection> tasks) { + throw new RejectedExecutionException("Use another ExecutorService implementation."); + } + + /** + * Always throws a RejectedExecutionException because using this method does not make sense from + * either a lazy execution perspective or a cached result perspective. + */ + @Override + public T invokeAny( + final Collection> tasks, long timeout, TimeUnit unit) { + throw new RejectedExecutionException("Use another ExecutorService implementation."); + } + + /** + * Always throws a RejectedExecutionException because using this method does not make sense from + * either a lazy execution perspective or a cached result perspective. + */ + @Override + public void execute(Runnable command) { + throw new RejectedExecutionException("Use submit instead of execute."); + } + + private static interface ExecutingFuture extends Future { + void backingServiceDied(); + } + + /** + * Executes the task when get() or isDone() are called, unless the job has been cancelled or the + * execution service is shutdown. + */ + private class ExecutingFutureImpl extends ForwardingFuture implements ExecutingFuture { + private final AtomicReference> state; + + ExecutingFutureImpl(Callable task) { + state = new AtomicReference>(new Created(task)); + } + + @Override + protected Future delegate() { + return state.get(); + } + + @Override + public void backingServiceDied() { + state.get().backingServiceDied(); + } + + /* + * States and transitions are defined such that they guarantee that only + * one thread will be changing the internal state of the object at a time + * and no thread will access inconsistent internal state. + * + *

Simple state changes are just a CAS of the AtomicReference holding the + * current state. Complex state changes are synchronized on the object whose + * state is being changed and the object is in the special InbetweenStates + * state until the internal state is consistent. + */ + + /** Initial state. */ + private class Created implements ExecutingFuture { + private final Callable task; + + Created(Callable task) { + this.task = task; + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + if (transitionToCancelled()) { + return true; + } else { + return state.get().cancel(mayInterruptIfRunning); + } + } + + @Override + public boolean isCancelled() { + return false; + } + + @Override + public boolean isDone() { + transitionToDelegated(); + return state.get().isDone(); + } + + @Override + public T get() throws ExecutionException, InterruptedException { + transitionToDelegated(); + return state.get().get(); + } + + @Override + public T get(long timeout, TimeUnit unit) + throws ExecutionException, InterruptedException, TimeoutException { + transitionToDelegated(); + return state.get().get(timeout, unit); + } + + @Override + public void backingServiceDied() { + transitionToCancelled(); + } + + private void transitionToDelegated() { + InbetweenStates transitionGuard = new InbetweenStates(); + if (state.compareAndSet(this, transitionGuard)) { + try { + Future backingFuture = + backingService.submit( + new Callable() { + @Override + public T call() throws Exception { + try { + return task.call(); + } finally { + removePendingTask(ExecutingFutureImpl.this); + } + } + }); + state.set(new Delegated(backingFuture)); + } catch (RejectedExecutionException e) { + state.set(new Cancelled()); + removePendingTask(ExecutingFutureImpl.this); + checkBackingService(); + } finally { + transitionGuard.latch.countDown(); + } + } + } + + @CanIgnoreReturnValue + private boolean transitionToCancelled() { + if (state.compareAndSet(this, new Cancelled())) { + removePendingTask(ExecutingFutureImpl.this); + return true; + } + return false; + } + } + + /** Represents the state where the Future was cancelled before execution started. */ + private class Cancelled implements ExecutingFuture { + @Override + public boolean isDone() { + return true; + } + + @Override + public boolean isCancelled() { + return true; + } + + @Override + public T get(long timeout, TimeUnit unit) { + throw new CancellationException(); + } + + @Override + public T get() { + throw new CancellationException(); + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return false; + } + + @Override + public void backingServiceDied() {} + } + + /** Execution has started and everything is delegated to the backingFuture. */ + private class Delegated implements ExecutingFuture { + private final Future backingFuture; + + Delegated(Future backingFuture) { + this.backingFuture = backingFuture; + } + + @Override + public boolean isDone() { + return backingFuture.isDone(); + } + + @Override + public boolean isCancelled() { + return backingFuture.isCancelled(); + } + + @Override + public T get(long timeout, TimeUnit unit) + throws ExecutionException, InterruptedException, TimeoutException { + return backingFuture.get(timeout, unit); + } + + @Override + public T get() throws ExecutionException, InterruptedException { + return backingFuture.get(); + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + if (!mayInterruptIfRunning) { + // Task is logically running already. + return false; + } + return backingFuture.cancel(mayInterruptIfRunning); + } + + @Override + public void backingServiceDied() {} + } + + /** + * Temporary state protecting a state transition until the object is in a consistent state. Each + * method here is synchronized and the thread performing the transition should always be first + * to hold the lock. + */ + private class InbetweenStates implements ExecutingFuture { + public final CountDownLatch latch = new CountDownLatch(1); + + @Override + public boolean isCancelled() { + return false; // Can only be true in a terminal state. Not there yet. + } + + @Override + public boolean isDone() { + return false; // Can only be true in a terminal state. Not there yet. + } + + @Override + public T get(long timeout, TimeUnit unit) + throws ExecutionException, InterruptedException, TimeoutException { + long startWait = System.nanoTime(); + if (!latch.await(timeout, unit)) { + throw new TimeoutException(); + } + long endWait = System.nanoTime(); + timeout -= unit.convert(endWait - startWait, NANOSECONDS); + return state.get().get(timeout, unit); + } + + @Override + public T get() throws ExecutionException, InterruptedException { + latch.await(); + return state.get().get(); + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return false; // Cancellation may fail for any reason. This is one. + } + + @Override + public void backingServiceDied() { + // Only relevant in the initial Created state, and we've left that. + } + } + } +}