Skip to content

Commit

Permalink
Mitigation for remote snapshot filecache overflow (opensearch-project…
Browse files Browse the repository at this point in the history
…#15077)

TransferManager fails BlobFetchRequest on full cache

Signed-off-by: Finn Carroll <carrofin@amazon.com>
  • Loading branch information
finnegancarroll committed Sep 5, 2024
1 parent f5c897c commit 8f34ce5
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 23 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix unchecked cast in dynamic action map getter ([#15394](https://github.com/opensearch-project/OpenSearch/pull/15394))
- Fix null values indexed as "null" strings in flat_object field ([#14069](https://github.com/opensearch-project/OpenSearch/pull/14069))
- Fix terms query on wildcard field returns nothing ([#15607](https://github.com/opensearch-project/OpenSearch/pull/15607))
- Fix remote snapshot file_cache exceeding capacity ([#15077](https://github.com/opensearch-project/OpenSearch/pull/15077))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,19 @@ public IndexInput fetchBlob(BlobFetchRequest blobFetchRequest) throws IOExceptio
@SuppressWarnings("removal")
private static FileCachedIndexInput createIndexInput(FileCache fileCache, StreamReader streamReader, BlobFetchRequest request) {
try {
// This local file cache is ref counted and may not strictly enforce configured capacity.
// If we find available capacity is exceeded, deny further BlobFetchRequests.
if (fileCache.capacity() < fileCache.usage().usage()) {
fileCache.prune();
throw new IOException(
"Local file cache capacity ("
+ fileCache.capacity()
+ ") exceeded ("
+ fileCache.usage().usage()
+ ") - BlobFetchRequest failed: "
+ request.getFilePath()
);
}
if (Files.exists(request.getFilePath()) == false) {
logger.trace("Fetching from Remote in createIndexInput of Transfer Manager");
try (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public void testConcurrentAccess() throws Exception {
}
}

public void testFetchBlobWithConcurrentCacheEvictions() throws Exception {
public void testFetchBlobWithConcurrentCacheEvictions() {
// Submit 256 tasks to an executor with 16 threads that will each randomly
// request one of eight blobs. Given that the cache can only hold two
// blobs this will lead to a huge amount of contention and thrashing.
Expand All @@ -114,41 +114,34 @@ public void testFetchBlobWithConcurrentCacheEvictions() throws Exception {
try (IndexInput indexInput = fetchBlobWithName(blobname)) {
assertIndexInputIsFunctional(indexInput);
}
} catch (IOException ignored) { // fetchBlobWithName may fail due to fixed capacity
} catch (Exception e) {
throw new AssertionError(e);
}
}));
}
// Wait for all threads to complete
for (Future<?> future : futures) {
future.get(10, TimeUnit.SECONDS);
try {
for (Future<?> future : futures) {
future.get(10, TimeUnit.SECONDS);
}
} catch (java.util.concurrent.ExecutionException ignored) { // Index input may be null
} catch (Exception e) {
throw new AssertionError(e);
}

} finally {
assertTrue(terminate(testRunner));
}
MatcherAssert.assertThat("Expected many evictions to happen", fileCache.stats().evictionCount(), greaterThan(0L));
}

public void testUsageExceedsCapacity() throws Exception {
// Fetch resources that exceed the configured capacity of the cache and assert that the
// returned IndexInputs are still functional.
try (IndexInput i1 = fetchBlobWithName("1"); IndexInput i2 = fetchBlobWithName("2"); IndexInput i3 = fetchBlobWithName("3")) {
assertIndexInputIsFunctional(i1);
assertIndexInputIsFunctional(i2);
assertIndexInputIsFunctional(i3);
MatcherAssert.assertThat(fileCache.usage().activeUsage(), equalTo((long) EIGHT_MB * 3));
MatcherAssert.assertThat(fileCache.usage().usage(), equalTo((long) EIGHT_MB * 3));
}
MatcherAssert.assertThat(fileCache.usage().activeUsage(), equalTo(0L));
MatcherAssert.assertThat(fileCache.usage().usage(), equalTo((long) EIGHT_MB * 3));
// Fetch another resource which will trigger an eviction
try (IndexInput i1 = fetchBlobWithName("1")) {
assertIndexInputIsFunctional(i1);
MatcherAssert.assertThat(fileCache.usage().activeUsage(), equalTo((long) EIGHT_MB));
MatcherAssert.assertThat(fileCache.usage().usage(), equalTo((long) EIGHT_MB));
}
MatcherAssert.assertThat(fileCache.usage().activeUsage(), equalTo(0L));
MatcherAssert.assertThat(fileCache.usage().usage(), equalTo((long) EIGHT_MB));
public void testOverflowDisabled() throws Exception {
initializeTransferManager();
IndexInput i1 = fetchBlobWithName("1");
IndexInput i2 = fetchBlobWithName("2");

assertThrows(IOException.class, () -> { IndexInput i3 = fetchBlobWithName("3"); });
}

public void testDownloadFails() throws Exception {
Expand Down

0 comments on commit 8f34ce5

Please sign in to comment.