Skip to content

Commit

Permalink
provide metadata in remoteTransferContainer for async translog upload…
Browse files Browse the repository at this point in the history
… flow

Signed-off-by: Sandeep Kumawat <skumwt@amazon.com>
  • Loading branch information
Sandeep Kumawat committed Apr 3, 2024
1 parent 950e55b commit 85711ae
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,7 @@ public InputStreamContainer apply(Integer partNo, Long size, Long position) thro
if (throwExceptionOnFinalizeUpload) {
throw new RuntimeException();
}
}, false, null), completionListener);
}, false, null, null), completionListener);

assertTrue(countDownLatch.await(5000, TimeUnit.SECONDS));
// wait for completableFuture to finish
Expand Down Expand Up @@ -533,7 +533,7 @@ public InputStreamContainer apply(Integer partNo, Long size, Long position) thro
if (throwExceptionOnFinalizeUpload) {
throw new RuntimeException();
}
}, false, null), completionListener);
}, false, null, null), completionListener);

assertTrue(countDownLatch.await(5000, TimeUnit.SECONDS));
if (expectException || throwExceptionOnFinalizeUpload) {
Expand Down Expand Up @@ -644,7 +644,7 @@ public InputStreamContainer apply(Integer partNo, Long size, Long position) thro
}
}, partSize, calculateLastPartSize(blobSize, partSize), calculateNumberOfParts(blobSize, partSize));
}
}, blobSize, false, WritePriority.HIGH, uploadSuccess -> { assertTrue(uploadSuccess); }, false, null), completionListener);
}, blobSize, false, WritePriority.HIGH, uploadSuccess -> { assertTrue(uploadSuccess); }, false, null, null), completionListener);

assertTrue(countDownLatch.await(5000, TimeUnit.SECONDS));
if (expectException) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ public InputStreamContainer apply(Integer partNo, Long size, Long position) thro
}
}, partSize, calculateLastPartSize(bytes.length, partSize), calculateNumberOfParts(bytes.length, partSize));
}
}, bytes.length, false, WritePriority.NORMAL, Assert::assertTrue, false, null), completionListener);
}, bytes.length, false, WritePriority.NORMAL, Assert::assertTrue, false, null, null), completionListener);

assertTrue(countDownLatch.await(5000, TimeUnit.SECONDS));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.common.StreamContext;

import java.io.IOException;
import java.util.Map;

/**
* WriteContext is used to encapsulate all data needed by <code>BlobContainer#writeStreams</code>
Expand All @@ -29,6 +30,7 @@ public class WriteContext {
private final CheckedConsumer<Boolean, IOException> uploadFinalizer;
private final boolean doRemoteDataIntegrityCheck;
private final Long expectedChecksum;
private Map<String, String> metadata;

/**
* Construct a new WriteContext object
Expand All @@ -49,7 +51,8 @@ public WriteContext(
WritePriority writePriority,
CheckedConsumer<Boolean, IOException> uploadFinalizer,
boolean doRemoteDataIntegrityCheck,
@Nullable Long expectedChecksum
@Nullable Long expectedChecksum,
Map<String, String> metadata
) {
this.fileName = fileName;
this.streamContextSupplier = streamContextSupplier;
Expand All @@ -59,6 +62,7 @@ public WriteContext(
this.uploadFinalizer = uploadFinalizer;
this.doRemoteDataIntegrityCheck = doRemoteDataIntegrityCheck;
this.expectedChecksum = expectedChecksum;
this.metadata = metadata;
}

/**
Expand Down Expand Up @@ -131,4 +135,11 @@ public boolean doRemoteDataIntegrityCheck() {
public Long getExpectedChecksum() {
return expectedChecksum;
}

/**
* @return the upload metadata.
*/
public Map<String, String> getMetadata() {
return metadata;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
Expand Down Expand Up @@ -55,6 +56,7 @@ public class RemoteTransferContainer implements Closeable {
private final OffsetRangeInputStreamSupplier offsetRangeInputStreamSupplier;
private final boolean isRemoteDataIntegritySupported;
private final AtomicBoolean readBlock = new AtomicBoolean();
private Map<String, String> metadata = null;

private static final Logger log = LogManager.getLogger(RemoteTransferContainer.class);

Expand Down Expand Up @@ -90,6 +92,41 @@ public RemoteTransferContainer(
this.isRemoteDataIntegritySupported = isRemoteDataIntegritySupported;
}

/**
* Construct a new RemoteTransferContainer object with metadata.
*
* @param fileName Name of the local file
* @param remoteFileName Name of the remote file
* @param contentLength Total content length of the file to be uploaded
* @param failTransferIfFileExists A boolean to determine if upload has to be failed if file exists
* @param writePriority The {@link WritePriority} of current upload
* @param offsetRangeInputStreamSupplier A supplier to create OffsetRangeInputStreams
* @param expectedChecksum The expected checksum value for the file being uploaded. This checksum will be used for local or remote data integrity checks
* @param isRemoteDataIntegritySupported A boolean to signify whether the remote repository supports server side data integrity verification
* @param metadata Object metadata to be store with the file.
*/
public RemoteTransferContainer(
String fileName,
String remoteFileName,
long contentLength,
boolean failTransferIfFileExists,
WritePriority writePriority,
OffsetRangeInputStreamSupplier offsetRangeInputStreamSupplier,
long expectedChecksum,
boolean isRemoteDataIntegritySupported,
Map<String, String> metadata
) {
this.fileName = fileName;
this.remoteFileName = remoteFileName;
this.contentLength = contentLength;
this.failTransferIfFileExists = failTransferIfFileExists;
this.writePriority = writePriority;
this.offsetRangeInputStreamSupplier = offsetRangeInputStreamSupplier;
this.expectedChecksum = expectedChecksum;
this.isRemoteDataIntegritySupported = isRemoteDataIntegritySupported;
this.metadata = metadata;
}

/**
* @return The {@link WriteContext} for the current upload
*/
Expand All @@ -102,7 +139,8 @@ public WriteContext createWriteContext() {
writePriority,
this::finalizeUpload,
isRemoteDataIntegrityCheckPossible(),
isRemoteDataIntegrityCheckPossible() ? expectedChecksum : null
isRemoteDataIntegrityCheckPossible() ? expectedChecksum : null,
metadata
);
}

Expand Down

0 comments on commit 85711ae

Please sign in to comment.