diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerMockClientTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerMockClientTests.java index 8c7e196d7c812..607f186f2db1e 100644 --- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerMockClientTests.java +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerMockClientTests.java @@ -483,7 +483,7 @@ public InputStreamContainer apply(Integer partNo, Long size, Long position) thro if (throwExceptionOnFinalizeUpload) { throw new RuntimeException(); } - }, false, null), completionListener); + }, false, null, null), completionListener); assertTrue(countDownLatch.await(5000, TimeUnit.SECONDS)); // wait for completableFuture to finish @@ -533,7 +533,7 @@ public InputStreamContainer apply(Integer partNo, Long size, Long position) thro if (throwExceptionOnFinalizeUpload) { throw new RuntimeException(); } - }, false, null), completionListener); + }, false, null, null), completionListener); assertTrue(countDownLatch.await(5000, TimeUnit.SECONDS)); if (expectException || throwExceptionOnFinalizeUpload) { @@ -644,7 +644,7 @@ public InputStreamContainer apply(Integer partNo, Long size, Long position) thro } }, partSize, calculateLastPartSize(blobSize, partSize), calculateNumberOfParts(blobSize, partSize)); } - }, blobSize, false, WritePriority.HIGH, uploadSuccess -> { assertTrue(uploadSuccess); }, false, null), completionListener); + }, blobSize, false, WritePriority.HIGH, uploadSuccess -> { assertTrue(uploadSuccess); }, false, null, null), completionListener); assertTrue(countDownLatch.await(5000, TimeUnit.SECONDS)); if (expectException) { diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerRetriesTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerRetriesTests.java index ceab06bd051e9..70f41e647648e 100644 --- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerRetriesTests.java +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerRetriesTests.java @@ -344,7 +344,7 @@ public InputStreamContainer apply(Integer partNo, Long size, Long position) thro } }, partSize, calculateLastPartSize(bytes.length, partSize), calculateNumberOfParts(bytes.length, partSize)); } - }, bytes.length, false, WritePriority.NORMAL, Assert::assertTrue, false, null), completionListener); + }, bytes.length, false, WritePriority.NORMAL, Assert::assertTrue, false, null, null), completionListener); assertTrue(countDownLatch.await(5000, TimeUnit.SECONDS)); diff --git a/server/src/main/java/org/opensearch/common/blobstore/stream/write/WriteContext.java b/server/src/main/java/org/opensearch/common/blobstore/stream/write/WriteContext.java index e74462f82400d..3373f32845c27 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/stream/write/WriteContext.java +++ b/server/src/main/java/org/opensearch/common/blobstore/stream/write/WriteContext.java @@ -13,6 +13,7 @@ import org.opensearch.common.StreamContext; import java.io.IOException; +import java.util.Map; /** * WriteContext is used to encapsulate all data needed by BlobContainer#writeStreams @@ -29,6 +30,7 @@ public class WriteContext { private final CheckedConsumer uploadFinalizer; private final boolean doRemoteDataIntegrityCheck; private final Long expectedChecksum; + private Map metadata; /** * Construct a new WriteContext object @@ -49,7 +51,8 @@ public WriteContext( WritePriority writePriority, CheckedConsumer uploadFinalizer, boolean doRemoteDataIntegrityCheck, - @Nullable Long expectedChecksum + @Nullable Long expectedChecksum, + Map metadata ) { this.fileName = fileName; this.streamContextSupplier = streamContextSupplier; @@ -59,6 +62,7 @@ public WriteContext( this.uploadFinalizer = uploadFinalizer; this.doRemoteDataIntegrityCheck = doRemoteDataIntegrityCheck; this.expectedChecksum = expectedChecksum; + this.metadata = metadata; } /** @@ -131,4 +135,11 @@ public boolean doRemoteDataIntegrityCheck() { public Long getExpectedChecksum() { return expectedChecksum; } + + /** + * @return the upload metadata. + */ + public Map getMetadata() { + return metadata; + } } diff --git a/server/src/main/java/org/opensearch/common/blobstore/transfer/RemoteTransferContainer.java b/server/src/main/java/org/opensearch/common/blobstore/transfer/RemoteTransferContainer.java index 2047c99d9e13b..8a0f7cdb4a474 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/transfer/RemoteTransferContainer.java +++ b/server/src/main/java/org/opensearch/common/blobstore/transfer/RemoteTransferContainer.java @@ -27,6 +27,7 @@ import java.io.Closeable; import java.io.IOException; import java.io.InputStream; +import java.util.Map; import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; @@ -55,6 +56,7 @@ public class RemoteTransferContainer implements Closeable { private final OffsetRangeInputStreamSupplier offsetRangeInputStreamSupplier; private final boolean isRemoteDataIntegritySupported; private final AtomicBoolean readBlock = new AtomicBoolean(); + private Map metadata = null; private static final Logger log = LogManager.getLogger(RemoteTransferContainer.class); @@ -90,6 +92,41 @@ public RemoteTransferContainer( this.isRemoteDataIntegritySupported = isRemoteDataIntegritySupported; } + /** + * Construct a new RemoteTransferContainer object with metadata. + * + * @param fileName Name of the local file + * @param remoteFileName Name of the remote file + * @param contentLength Total content length of the file to be uploaded + * @param failTransferIfFileExists A boolean to determine if upload has to be failed if file exists + * @param writePriority The {@link WritePriority} of current upload + * @param offsetRangeInputStreamSupplier A supplier to create OffsetRangeInputStreams + * @param expectedChecksum The expected checksum value for the file being uploaded. This checksum will be used for local or remote data integrity checks + * @param isRemoteDataIntegritySupported A boolean to signify whether the remote repository supports server side data integrity verification + * @param metadata Object metadata to be store with the file. + */ + public RemoteTransferContainer( + String fileName, + String remoteFileName, + long contentLength, + boolean failTransferIfFileExists, + WritePriority writePriority, + OffsetRangeInputStreamSupplier offsetRangeInputStreamSupplier, + long expectedChecksum, + boolean isRemoteDataIntegritySupported, + Map metadata + ) { + this.fileName = fileName; + this.remoteFileName = remoteFileName; + this.contentLength = contentLength; + this.failTransferIfFileExists = failTransferIfFileExists; + this.writePriority = writePriority; + this.offsetRangeInputStreamSupplier = offsetRangeInputStreamSupplier; + this.expectedChecksum = expectedChecksum; + this.isRemoteDataIntegritySupported = isRemoteDataIntegritySupported; + this.metadata = metadata; + } + /** * @return The {@link WriteContext} for the current upload */ @@ -102,7 +139,8 @@ public WriteContext createWriteContext() { writePriority, this::finalizeUpload, isRemoteDataIntegrityCheckPossible(), - isRemoteDataIntegrityCheckPossible() ? expectedChecksum : null + isRemoteDataIntegrityCheckPossible() ? expectedChecksum : null, + metadata ); }