Skip to content

Commit

Permalink
Addressing PR comments
Browse files Browse the repository at this point in the history
Signed-off-by: Raghuvansh Raj <raghraaj@amazon.com>
  • Loading branch information
raghuvanshraj committed Jun 5, 2023
1 parent 51ba892 commit 18da4ea
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.opensearch.common.CheckedFunction;
import org.opensearch.common.util.UploadListener;
import org.opensearch.index.store.RemoteSegmentStoreDirectory;

import java.io.IOException;
Expand All @@ -30,7 +31,7 @@ public class FileUploader {

private static final Logger logger = LogManager.getLogger(FileUploader.class);

private final UploadTracker uploadTracker;
private final UploadListener uploadListener;

private final RemoteSegmentStoreDirectory remoteDirectory;

Expand All @@ -41,13 +42,13 @@ public class FileUploader {
private final CheckedFunction<String, String, IOException> checksumProvider;

public FileUploader(
UploadTracker uploadTracker,
UploadListener uploadListener,
RemoteSegmentStoreDirectory remoteDirectory,
Directory storeDirectory,
Set<String> excludeFiles,
CheckedFunction<String, String, IOException> checksumProvider
) {
this.uploadTracker = uploadTracker;
this.uploadListener = uploadListener;
this.remoteDirectory = remoteDirectory;
this.storeDirectory = storeDirectory;
this.excludeFiles = excludeFiles;
Expand All @@ -64,7 +65,7 @@ public FileUploader(
*/
public boolean uploadFiles(Collection<String> files) throws Exception {
Collection<String> filteredFiles = files.stream().filter(file -> !skipUpload(file)).collect(Collectors.toList());
return remoteDirectory.copyFilesFrom(storeDirectory, filteredFiles, IOContext.DEFAULT, uploadTracker);
return remoteDirectory.copyFilesFrom(storeDirectory, filteredFiles, IOContext.DEFAULT, uploadListener);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.opensearch.action.bulk.BackoffPolicy;
import org.opensearch.common.concurrent.GatedCloseable;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.UploadListener;
import org.opensearch.common.util.concurrent.ConcurrentCollections;
import org.opensearch.index.engine.EngineException;
import org.opensearch.index.engine.InternalEngine;
Expand Down Expand Up @@ -133,7 +134,7 @@ public RemoteStoreRefreshListener(
this.segmentTracker = segmentTracker;
resetBackOffDelayIterator();
this.checkpointPublisher = checkpointPublisher;
this.fileUploader = new FileUploader(new UploadTracker() {
this.fileUploader = new FileUploader(new UploadListener() {
@Override
public void beforeUpload(String file) {
// Start tracking the upload bytes started
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import org.opensearch.common.io.VersionedCodecStreamWrapper;
import org.opensearch.common.lucene.store.ByteArrayIndexInput;
import org.opensearch.common.util.ByteUtils;
import org.opensearch.index.shard.UploadTracker;
import org.opensearch.common.util.UploadListener;
import org.opensearch.index.store.exception.ChecksumCombinationException;
import org.opensearch.index.store.lockmanager.FileLockInfo;
import org.opensearch.index.store.lockmanager.RemoteStoreCommitLevelLockManager;
Expand Down Expand Up @@ -356,46 +356,46 @@ public IndexInput openInput(String name, IOContext context) throws IOException {
* @param from The directory for all files to be uploaded
* @param files A list containing the names of all files to be uploaded
* @param context IOContext to be used to open IndexInput to files during remote upload
* @param uploadTracker An {@link UploadTracker} for tracking file uploads
* @param uploadListener An {@link UploadListener} for tracking file uploads
* @throws Exception When upload future creation fails or if {@link RemoteSegmentStoreDirectory#copyFrom(Directory, String, String, IOContext, boolean)}
* throws an exception
*/
public boolean copyFilesFrom(Directory from, Collection<String> files, IOContext context, UploadTracker uploadTracker)
public boolean copyFilesFrom(Directory from, Collection<String> files, IOContext context, UploadListener uploadListener)
throws Exception {

List<CompletableFuture<Void>> resultFutures = new ArrayList<>();

boolean uploadOfAllFilesSuccessful = true;
for (String src : files) {
String remoteFilename = createRemoteFileName(src, false);
uploadTracker.beforeUpload(src);
uploadListener.beforeUpload(src);
if (remoteDataDirectory.getBlobContainer().isMultiStreamUploadSupported()) {
try {
CompletableFuture<Void> resultFuture = createUploadFuture(from, src, remoteFilename, context);
resultFuture.whenComplete((uploadResponse, throwable) -> {
if (throwable != null) {
uploadTracker.onFailure(src);
uploadListener.onFailure(src);
} else {
uploadTracker.onSuccess(src);
uploadListener.onSuccess(src);
}
});
resultFutures.add(resultFuture);
} catch (Exception e) {
uploadTracker.onFailure(src);
uploadListener.onFailure(src);
throw e;
}
} else {
boolean success = true;
try {
copyFrom(from, src, src, context, false);
uploadTracker.onSuccess(src);
uploadListener.onSuccess(src);
} catch (IOException e) {
success = false;
uploadOfAllFilesSuccessful = false;
logger.warn(() -> new ParameterizedMessage("Exception while uploading file {} to the remote segment store", src), e);
} finally {
if (!success) {
uploadTracker.onFailure(src);
uploadListener.onFailure(src);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public class RemoteSegmentStoreDirectoryTests extends OpenSearchTestCase {
private RemoteStoreMetadataLockManager mdLockManager;

private RemoteSegmentStoreDirectory remoteSegmentStoreDirectory;
private TestUploadTracker testUploadTracker;
private TestUploadListener testUploadListener;

@Before
public void setup() throws IOException {
Expand All @@ -66,7 +66,7 @@ public void setup() throws IOException {
mdLockManager = mock(RemoteStoreMetadataLockManager.class);

remoteSegmentStoreDirectory = new RemoteSegmentStoreDirectory(remoteDataDirectory, remoteMetadataDirectory, mdLockManager);
testUploadTracker = new TestUploadTracker();
testUploadListener = new TestUploadListener();
}

public void testUploadedSegmentMetadataToString() {
Expand Down Expand Up @@ -523,10 +523,10 @@ public void testCopyFilesFromMultipart() throws Exception {
uploadResponseCompletableFuture.complete(null);
when(blobContainer.writeBlobByStreams(any(WriteContext.class))).thenReturn(uploadResponseCompletableFuture);

remoteSegmentStoreDirectory.copyFilesFrom(storeDirectory, List.of(filename), IOContext.DEFAULT, testUploadTracker);
remoteSegmentStoreDirectory.copyFilesFrom(storeDirectory, List.of(filename), IOContext.DEFAULT, testUploadListener);

assertTrue(remoteSegmentStoreDirectory.getSegmentsUploadedToRemoteStore().containsKey(filename));
assertEquals(TestUploadTracker.UploadStatus.UPLOAD_SUCCESS, testUploadTracker.getUploadStatus(filename));
assertEquals(TestUploadListener.UploadStatus.UPLOAD_SUCCESS, testUploadListener.getUploadStatus(filename));

storeDirectory.close();
}
Expand Down Expand Up @@ -554,11 +554,11 @@ public void testCopyFilesFromMultipartIOException() throws Exception {

assertThrows(
IOException.class,
() -> remoteSegmentStoreDirectory.copyFilesFrom(storeDirectory, List.of(filename), IOContext.DEFAULT, testUploadTracker)
() -> remoteSegmentStoreDirectory.copyFilesFrom(storeDirectory, List.of(filename), IOContext.DEFAULT, testUploadListener)
);

assertFalse(remoteSegmentStoreDirectory.getSegmentsUploadedToRemoteStore().containsKey(filename));
assertEquals(TestUploadTracker.UploadStatus.UPLOAD_FAILURE, testUploadTracker.getUploadStatus(filename));
assertEquals(TestUploadListener.UploadStatus.UPLOAD_FAILURE, testUploadListener.getUploadStatus(filename));

storeDirectory.close();
}
Expand Down Expand Up @@ -586,11 +586,11 @@ public void testCopyFilesFromMultipartUploadFutureCompletedExceptionally() throw

assertThrows(
ExecutionException.class,
() -> remoteSegmentStoreDirectory.copyFilesFrom(storeDirectory, List.of(filename), IOContext.DEFAULT, testUploadTracker)
() -> remoteSegmentStoreDirectory.copyFilesFrom(storeDirectory, List.of(filename), IOContext.DEFAULT, testUploadListener)
);

assertFalse(remoteSegmentStoreDirectory.getSegmentsUploadedToRemoteStore().containsKey(filename));
assertEquals(TestUploadTracker.UploadStatus.UPLOAD_FAILURE, testUploadTracker.getUploadStatus(filename));
assertEquals(TestUploadListener.UploadStatus.UPLOAD_FAILURE, testUploadListener.getUploadStatus(filename));

storeDirectory.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@

package org.opensearch.index.store;

import org.opensearch.index.shard.UploadTracker;
import org.opensearch.common.util.UploadListener;

import java.util.concurrent.ConcurrentHashMap;

public class TestUploadTracker implements UploadTracker {
public class TestUploadListener implements UploadListener {

private final ConcurrentHashMap<String, UploadStatus> uploadStatusMap = new ConcurrentHashMap<>();

Expand Down

0 comments on commit 18da4ea

Please sign in to comment.