Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move implicit directory repair from list to delete operations #156

Merged
merged 7 commits into from
Apr 2, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions gcs/CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@

7. Remove deprecated `fs.gs.file.size.limit.250gb` property.

8. Repair implicit directories during delete operations instead of list
operations.


1.9.14 - 2019-02-13

1. Implement Hadoop File System `concat` method using GCS compose API.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1007,8 +1007,7 @@ public FileStatus[] listStatus(Path hadoopPath)
List<FileStatus> status;

try {
List<FileInfo> fileInfos =
getGcsFs().listFileInfo(gcsPath, isAutoRepairImplicitDirectoriesEnabled());
List<FileInfo> fileInfos = getGcsFs().listFileInfo(gcsPath);
status = new ArrayList<>(fileInfos.size());
String userName = getUgiUserName();
for (FileInfo fileInfo : fileInfos) {
Expand All @@ -1025,11 +1024,6 @@ public FileStatus[] listStatus(Path hadoopPath)
return status.toArray(new FileStatus[0]);
}

private boolean isAutoRepairImplicitDirectoriesEnabled() {
GoogleCloudStorageFileSystemOptions gcsFsOptions = getGcsFs().getOptions();
return gcsFsOptions.getCloudStorageOptions().isAutoRepairImplicitDirectoriesEnabled();
}

/**
* Sets the current working directory to the given path.
*
Expand Down Expand Up @@ -1277,7 +1271,7 @@ public FileStatus[] globStatus(Path pathPattern, PathFilter filter) throws IOExc
return flatGlobInternal(fixedPath, filter);
}

return globInternal(fixedPath, filter, pathPattern);
return super.globStatus(fixedPath, filter);
}

/**
Expand All @@ -1288,7 +1282,7 @@ private FileStatus[] concurrentGlobInternal(Path fixedPath, PathFilter filter, P
throws IOException {
ExecutorService executorService = Executors.newFixedThreadPool(2, DAEMON_THREAD_FACTORY);
Callable<FileStatus[]> flatGlobTask = () -> flatGlobInternal(fixedPath, filter);
Callable<FileStatus[]> nonFlatGlobTask = () -> globInternal(fixedPath, filter, pathPattern);
Callable<FileStatus[]> nonFlatGlobTask = () -> super.globStatus(fixedPath, filter);

try {
return executorService.invokeAny(Arrays.asList(flatGlobTask, nonFlatGlobTask));
Expand Down Expand Up @@ -1359,40 +1353,9 @@ private FileStatus[] flatGlobInternal(Path fixedPath, PathFilter filter) throws

FileStatus[] returnList = filteredStatuses.toArray(new FileStatus[0]);

// If the return list contains directories, we should repair them if they're 'implicit'.
if (isAutoRepairImplicitDirectoriesEnabled()) {
List<URI> toRepair = new ArrayList<>();
for (FileStatus status : returnList) {
if (isImplicitDirectory(status)) {
toRepair.add(getGcsPath(status.getPath()));
}
}
if (!toRepair.isEmpty()) {
logger.atWarning().log(
"Discovered %s implicit directories to repair within return values.", toRepair.size());
getGcsFs().repairDirs(toRepair);
}
}

return returnList;
}

private FileStatus[] globInternal(Path fixedPath, PathFilter filter, Path pathPattern)
throws IOException {
FileStatus[] ret = super.globStatus(fixedPath, filter);
if (ret == null) {
if (isAutoRepairImplicitDirectoriesEnabled()) {
logger.atFine().log(
"GHFS.globStatus returned null for '%s', attempting possible repair.", pathPattern);
if (getGcsFs().repairPossibleImplicitDirectory(getGcsPath(fixedPath))) {
logger.atWarning().log("Success repairing '%s', re-globbing.", pathPattern);
ret = super.globStatus(fixedPath, filter);
}
}
}
return ret;
}

private static boolean isImplicitDirectory(FileStatus curr) {
// Modification time of 0 indicates implicit directory.
return curr.isDir() && curr.getModificationTime() == 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,8 +267,7 @@ public class GoogleHadoopFileSystemConfiguration {

/**
* Configuration key for enabling automatic repair of implicit directories whenever detected
* inside listStatus and globStatus calls, or other methods which may indirectly call listStatus
* and/or globStatus.
* inside delete calls.
*/
public static final GoogleHadoopFileSystemConfigurationProperty<Boolean>
GCS_REPAIR_IMPLICIT_DIRECTORIES_ENABLE =
Expand All @@ -283,9 +282,7 @@ public class GoogleHadoopFileSystemConfiguration {
/**
* Configuration key for enabling automatic inference of implicit directories. If set, we create
* and return in-memory directory objects on the fly when no backing object exists, but we know
* there are files with the same prefix. The ENABLE_REPAIR flag takes precedence over this flag:
* if both are set, the repair is attempted, and only if it fails does the setting of this flag
* kick in.
* there are files with the same prefix.
*/
public static final GoogleHadoopFileSystemConfigurationProperty<Boolean>
GCS_INFER_IMPLICIT_DIRECTORIES_ENABLE =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,16 @@
import static com.google.common.truth.Truth.assertWithMessage;
import static org.junit.Assert.assertThrows;

import com.google.cloud.hadoop.gcsio.GoogleCloudStorage;
import com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem;
import com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystemIntegrationTest;
import com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystemOptions.TimestampUpdatePredicate;
import com.google.cloud.hadoop.gcsio.GoogleCloudStorageOptions;
import com.google.cloud.hadoop.gcsio.StorageResourceId;
import com.google.cloud.hadoop.gcsio.testing.TestConfiguration;
import com.google.cloud.hadoop.testing.TestingAccessTokenProvider;
import com.google.common.base.Joiner;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.Writer;
import java.net.URI;
Expand Down Expand Up @@ -95,9 +96,6 @@ protected static Configuration loadConfig(
GoogleHadoopFileSystemConfiguration.GCS_INFER_IMPLICIT_DIRECTORIES_ENABLE.getKey(), false);
// Allow buckets to be deleted in test cleanup:
config.setBoolean(GoogleHadoopFileSystemConfiguration.GCE_BUCKET_DELETE_ENABLE.getKey(), true);
// Disable concurrent globbing because it's unpredictable in auto repairing parent directory
config.setBoolean(
GoogleHadoopFileSystemConfiguration.GCS_CONCURRENT_GLOB_ENABLE.getKey(), false);
return config;
}

Expand Down Expand Up @@ -191,12 +189,9 @@ public void testGetCanonicalServiceName() {
assertThat(ghfs.getCanonicalServiceName()).isNull();
}

/**
* Makes listStatus and globStatus perform repairs by first creating an object directly without
* creating its parent directory object.
*/
/** Test implicit directories. */
@Test
public void testRepairImplicitDirectory() throws IOException, URISyntaxException {
public void testImplicitDirectory() throws IOException {
String bucketName = sharedBucketName1;
GoogleHadoopFileSystemBase myghfs = (GoogleHadoopFileSystemBase) ghfs;
GoogleCloudStorageFileSystem gcsfs = myghfs.getGcsFs();
Expand All @@ -215,90 +210,66 @@ public void testRepairImplicitDirectory() throws IOException, URISyntaxException

boolean inferredDirExists =
gcsfs.getOptions().getCloudStorageOptions().isInferImplicitDirectoriesEnabled();
boolean inferredOrRepairedDirExists =
gcsfs.getOptions().getCloudStorageOptions().isInferImplicitDirectoriesEnabled()
|| gcsfs.getOptions().getCloudStorageOptions().isAutoRepairImplicitDirectoriesEnabled();

assertDirectory(gcsfs, leafUri, /* exists= */ true);
assertDirectory(gcsfs, subdirUri, inferredDirExists);
assertDirectory(gcsfs, parentUri, inferredDirExists);

if (inferredOrRepairedDirExists) {
myghfs.listStatus(parentPath);
} else {
assertThrows(FileNotFoundException.class, () -> myghfs.listStatus(parentPath));
}

assertDirectory(gcsfs, leafUri, /* exists= */ true);
assertDirectory(gcsfs, subdirUri, inferredOrRepairedDirExists);
assertDirectory(gcsfs, parentUri, inferredOrRepairedDirExists);

ghfsHelper.clearBucket(bucketName);
}

// Reset for globStatus.
gcsfs.mkdir(leafUri);
/**
* Test directory repair at deletion
*
* @throws IOException
*/
@Test
public void testRepairDirectory() throws IOException {
String bucketName = sharedBucketName1;
GoogleHadoopFileSystemBase myghfs = (GoogleHadoopFileSystemBase) ghfs;
GoogleCloudStorageFileSystem gcsfs = myghfs.getGcsFs();
GoogleCloudStorage gcs = gcsfs.getGcs();
URI seedUri = GoogleCloudStorageFileSystemIntegrationTest.getTempFilePath();
Path dirPath = ghfsHelper.castAsHadoopPath(seedUri);
URI dirUri = myghfs.getGcsPath(dirPath);

assertDirectory(gcsfs, leafUri, /* exists= */ true);
assertDirectory(gcsfs, subdirUri, inferredDirExists);
assertDirectory(gcsfs, parentUri, inferredDirExists);
// A subdir path that looks like gs://<bucket>/<generated-tempdir>/foo-subdir where
// neither the subdir nor gs://<bucket>/<generated-tempdir> exist yet.
Path emptyObject = new Path(dirPath, "empty-object");
URI objUri = myghfs.getGcsPath(emptyObject);
StorageResourceId resource = gcsfs.getPathCodec().validatePathAndGetId(objUri, false);
gcs.createEmptyObject(resource);

myghfs.globStatus(parentPath);
boolean inferImplicitDirectories =
gcsfs.getOptions().getCloudStorageOptions().isInferImplicitDirectoriesEnabled();

// Globbing the single directory only repairs that top-level directory; it is *not* the same
// as listStatus.
assertDirectory(gcsfs, leafUri, /* exists= */ true);
assertDirectory(gcsfs, subdirUri, inferredDirExists);
assertDirectory(gcsfs, parentUri, inferredOrRepairedDirExists);
assertWithMessage(
"Expected to %s: %s", inferImplicitDirectories ? "exist" : "not exist", dirUri)
.that(gcsfs.exists(dirUri))
.isEqualTo(inferImplicitDirectories);

gcsfs.delete(objUri, false);
// Implicit directory created after deletion of the sole object in the directory
assertWithMessage("Expected to exist: %s", dirUri).that(gcsfs.exists(dirUri)).isTrue();
ghfsHelper.clearBucket(bucketName);

// Reset for globStatus(path/*)
gcsfs.mkdir(leafUri);

assertDirectory(gcsfs, leafUri, /* exists= */ true);
assertDirectory(gcsfs, subdirUri, inferredDirExists);
assertDirectory(gcsfs, parentUri, inferredDirExists);

// When globbing children, the parent will only be repaired if flat-globbing is not enabled.
Path globChildrenPath = new Path(parentPath.toString() + "/*");
myghfs.globStatus(globChildrenPath);
boolean expectParentRepair =
!(myghfs.enableFlatGlob && myghfs.couldUseFlatGlob(globChildrenPath));

// This will internally call listStatus, so will have the same behavior of repairing both
// levels of subdirectories.
assertDirectory(gcsfs, leafUri, /* exists= */ true);

assertDirectory(gcsfs, subdirUri, inferredOrRepairedDirExists);

if (expectParentRepair || inferredDirExists) {
assertWithMessage("Expected to exist: %s", parentUri).that(gcsfs.exists(parentUri)).isTrue();
} else {
assertWithMessage("Expected not to exist due to flat globbing: %s", parentUri)
.that(gcsfs.exists(parentUri))
.isFalse();
// test implicit dir repair after a subdir vs. an object has been deleted (recursively)
if (inferImplicitDirectories) {
// only if directory inferring is enabled, the directory without the implicit
// directory entry can be deleted without the FileNotFoundException
Path subDir = new Path(dirPath, "subdir");
emptyObject = new Path(subDir, "empty-object");
objUri = myghfs.getGcsPath(emptyObject);
resource = gcsfs.getPathCodec().validatePathAndGetId(objUri, false);
gcs.createEmptyObject(resource);
URI subdirUri = myghfs.getGcsPath(subDir);
assertWithMessage("Expected to exist: %s", dirUri).that(gcsfs.exists(dirUri)).isTrue();
assertWithMessage("Expected to exist: %s", subdirUri).that(gcsfs.exists(subdirUri)).isTrue();
gcsfs.delete(subdirUri, true);
// Implicit directory created after deletion of the sole object in the directory
assertWithMessage("Expected to exist: %s", dirUri).that(gcsfs.exists(dirUri)).isTrue();
ghfsHelper.clearBucket(bucketName);
}

ghfsHelper.clearBucket(bucketName);

// Reset for globStatus(path*)
gcsfs.mkdir(leafUri);

assertDirectory(gcsfs, leafUri, /* exists= */ true);
assertDirectory(gcsfs, subdirUri, inferredDirExists);
assertDirectory(gcsfs, parentUri, inferredDirExists);

// Globbing with a wildcard in the parentUri itself also only repairs one level, but for
// a different reason than globbing with no wildcard. Globbing with no wildcard requires
// catching 'null' in globStatus, whereas having the wildcard causes the repair to happen
// when listing parentOf(parentUri).
myghfs.globStatus(new Path(parentPath.toString() + "*"));

assertDirectory(gcsfs, leafUri, /* exists= */ true);
assertDirectory(gcsfs, subdirUri, inferredDirExists);
assertDirectory(gcsfs, parentUri, inferredOrRepairedDirExists);

ghfsHelper.clearBucket(bucketName);
}

private static void assertDirectory(GoogleCloudStorageFileSystem gcsfs, URI path, boolean exists)
Expand Down
Loading