Skip to content

Commit

Permalink
Add property to parallelize GCS requests in listStatus and `getFile…
Browse files Browse the repository at this point in the history
…Status` methods

Note: this is essentially the same change as in [] that triggered omg/12873 in the past, but it has feature flag that turns off it by default and tests that assert number of GCS requests when parallelism is enabled.

In the worst case `getFileStatus` method can make up to 3 sequential requests to GCS to get implicit directory status.

After moving implicit directory repair from list to delete/rename operations this worst case could be more frequent than before, because there higher chance to encounter implicit non-repaired directory:
#156

This CL adds an option to execute these GCS requests in parallel which could reduce latency by up to 3 times.

	Change on 2019/05/13 by idv <idv@google.com>

-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=248044068
  • Loading branch information
medb committed May 14, 2019
1 parent 5057c8d commit 3726b97
Show file tree
Hide file tree
Showing 8 changed files with 906 additions and 25 deletions.
9 changes: 9 additions & 0 deletions gcs/CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,15 @@
1. Remove obsolete `fs.gs.performance.cache.dir.metadata.prefetch.limit`
property.

1. Add a property to parallelize GCS requests in `getFileStatus` and
`listStatus` methods to reduce latency:

fs.gs.status.parallel.enable (default: false)

Setting this property to `true` will cause GCS connector to send more GCS
requests which will decrease latency but also increase cost of
`getFileStatus` and `listStatus` method calls.

### 1.9.14 - 2019-02-13

1. Implement Hadoop File System `concat` method using GCS compose API.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,14 @@ public class GoogleHadoopFileSystemConfiguration {
"fs.gs.performance.cache.list.caching.enable",
PerformanceCachingGoogleCloudStorageOptions.LIST_CACHING_ENABLED);

/**
* If true, executes GCS requests in {@code listStatus} and {@code getFileStatus} methods in
* parallel to reduce latency.
*/
public static final GoogleHadoopFileSystemConfigurationProperty<Boolean>
GCS_STATUS_PARALLEL_ENABLE =
new GoogleHadoopFileSystemConfigurationProperty<>("fs.gs.status.parallel.enable", false);

/**
* Configuration key for whether or not we should update timestamps for parent directories when we
* create new files in them.
Expand Down Expand Up @@ -498,7 +506,8 @@ static GoogleCloudStorageFileSystemOptions.Builder getGcsFsOptionsBuilder(Config
.setMarkerFilePattern(GCS_MARKER_FILE_PATTERN.get(config, config::get))
.setIsPerformanceCacheEnabled(
GCS_PERFORMANCE_CACHE_ENABLE.get(config, config::getBoolean))
.setImmutablePerformanceCachingOptions(getPerformanceCachingOptions(config));
.setImmutablePerformanceCachingOptions(getPerformanceCachingOptions(config))
.setStatusParallelEnabled(GCS_STATUS_PARALLEL_ENABLE.get(config, config::getBoolean));

String projectId = GCS_PROJECT_ID.get(config, config::get);
gcsFsOptionsBuilder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.google.api.client.util.Clock;
import com.google.cloud.hadoop.gcsio.GoogleCloudStorage.ListPage;
import com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystemOptions.TimestampUpdatePredicate;
import com.google.cloud.hadoop.util.LazyExecutorService;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.CharMatcher;
import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -1055,11 +1056,20 @@ public List<FileInfo> listFileInfo(URI path) throws IOException {
StorageResourceId dirId =
pathCodec.validatePathAndGetId(FileInfo.convertToDirectoryPath(pathCodec, path), true);

// To improve performance start to list directory items right away.
ExecutorService dirExecutor = Executors.newSingleThreadExecutor(DAEMON_THREAD_FACTORY);
ExecutorService dirExecutor =
options.isStatusParallelEnabled()
? Executors.newFixedThreadPool(2, DAEMON_THREAD_FACTORY)
: new LazyExecutorService();
try {
Future<GoogleCloudStorageItemInfo> dirFuture =
dirExecutor.submit(() -> gcs.getItemInfo(dirId));
Future<List<GoogleCloudStorageItemInfo>> dirChildrenFutures =
dirExecutor.submit(
() ->
dirId.isRoot()
? gcs.listBucketInfo()
: gcs.listObjectInfo(
dirId.getBucketName(), dirId.getObjectName(), PATH_DELIMITER));
dirExecutor.shutdown();

if (!pathId.isDirectory()) {
Expand All @@ -1073,10 +1083,7 @@ public List<FileInfo> listFileInfo(URI path) throws IOException {

try {
GoogleCloudStorageItemInfo dirInfo = dirFuture.get();
List<GoogleCloudStorageItemInfo> dirItemInfos =
dirId.isRoot()
? gcs.listBucketInfo()
: gcs.listObjectInfo(dirId.getBucketName(), dirId.getObjectName(), PATH_DELIMITER);
List<GoogleCloudStorageItemInfo> dirItemInfos = dirChildrenFutures.get();
if (!dirInfo.exists() && dirItemInfos.isEmpty()) {
throw new FileNotFoundException("Item not found: " + path);
}
Expand Down Expand Up @@ -1124,9 +1131,19 @@ private GoogleCloudStorageItemInfo getFileInfoInternal(
return gcs.getItemInfo(resourceId);
}
StorageResourceId dirId = FileInfo.convertToDirectoryPath(resourceId);
// To improve performance get directory and its child right away.
ExecutorService dirExecutor = Executors.newSingleThreadExecutor(DAEMON_THREAD_FACTORY);

ExecutorService dirExecutor =
options.isStatusParallelEnabled()
? resourceId.isDirectory()
? Executors.newSingleThreadExecutor(DAEMON_THREAD_FACTORY)
: Executors.newFixedThreadPool(2, DAEMON_THREAD_FACTORY)
: new LazyExecutorService();
try {
Future<List<String>> dirChildFuture =
dirExecutor.submit(
() ->
gcs.listObjectNames(
dirId.getBucketName(), dirId.getObjectName(), PATH_DELIMITER, 1));
Future<GoogleCloudStorageItemInfo> dirFuture =
resourceId.isDirectory()
? Futures.immediateFuture(gcs.getItemInfo(resourceId))
Expand All @@ -1146,9 +1163,7 @@ private GoogleCloudStorageItemInfo getFileInfoInternal(
return dirInfo;
}

List<String> dirChild =
gcs.listObjectNames(dirId.getBucketName(), dirId.getObjectName(), PATH_DELIMITER, 1);
if (dirChild.isEmpty()) {
if (dirChildFuture.get().isEmpty()) {
return GoogleCloudStorageItemInfo.createNotFound(resourceId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public boolean shouldUpdateTimestamp(URI item) {
private PathCodec pathCodec = GoogleCloudStorageFileSystem.URI_ENCODED_PATH_CODEC;
private boolean enableBucketDelete = false;
private String markerFilePattern = null;
private boolean statusParallelEnabled = false;

public Builder setIsPerformanceCacheEnabled(boolean performanceCacheEnabled) {
this.performanceCacheEnabled = performanceCacheEnabled;
Expand Down Expand Up @@ -180,6 +181,15 @@ public Builder setMarkerFilePattern(String markerFilePattern) {
return this;
}

/**
* Enables parallel execution of GCS requests in {@code listFileInfo} and {@code getFileInfo}
* methods to reduce latency.
*/
public Builder setStatusParallelEnabled(boolean statusParallelEnabled) {
this.statusParallelEnabled = statusParallelEnabled;
return this;
}

public GoogleCloudStorageFileSystemOptions build() {
return new GoogleCloudStorageFileSystemOptions(
immutablePerformanceCacheOptions != null
Expand All @@ -192,7 +202,8 @@ public GoogleCloudStorageFileSystemOptions build() {
shouldIncludeInTimestampUpdatesPredicate,
pathCodec,
enableBucketDelete,
markerFilePattern);
markerFilePattern,
statusParallelEnabled);
}
}

Expand All @@ -207,6 +218,7 @@ public static Builder newBuilder() {
private final PathCodec pathCodec;
private final boolean enableBucketDelete;
private final Pattern markerFilePattern;
private final boolean statusParallelEnabled;

public GoogleCloudStorageFileSystemOptions(
PerformanceCachingGoogleCloudStorageOptions performanceCacheOptions,
Expand All @@ -215,14 +227,16 @@ public GoogleCloudStorageFileSystemOptions(
TimestampUpdatePredicate shouldIncludeInTimestampUpdatesPredicate,
PathCodec pathCodec,
boolean enableBucketDelete,
String markerFilePattern) {
String markerFilePattern,
boolean statusParallelEnabled) {
this.performanceCacheOptions = performanceCacheOptions;
this.performanceCacheEnabled = performanceCacheEnabled;
this.cloudStorageOptions = cloudStorageOptions;
this.shouldIncludeInTimestampUpdatesPredicate = shouldIncludeInTimestampUpdatesPredicate;
this.pathCodec = pathCodec;
this.enableBucketDelete = enableBucketDelete;
this.markerFilePattern = Pattern.compile("^(.+/)?" + markerFilePattern + "$");
this.statusParallelEnabled = statusParallelEnabled;
}

public PerformanceCachingGoogleCloudStorageOptions getPerformanceCacheOptions() {
Expand Down Expand Up @@ -253,11 +267,15 @@ public Pattern getMarkerFilePattern() {
return markerFilePattern;
}

public boolean isStatusParallelEnabled() {
return statusParallelEnabled;
}

public void throwIfNotValid() {
checkNotNull(
shouldIncludeInTimestampUpdatesPredicate,
"Predicate for ignored directory updates should not be null."
+ " Consider Predicates.alwasyTrue");
+ " Consider Predicates.alwaysTrue");
cloudStorageOptions.throwIfNotValid();
}
}
Loading

0 comments on commit 3726b97

Please sign in to comment.