Skip to content

Commit

Permalink
Add multipart upload integration for translog and segment files
Browse files Browse the repository at this point in the history
Signed-off-by: Raghuvansh Raj <raghraaj@amazon.com>
  • Loading branch information
raghuvanshraj committed Apr 12, 2023
1 parent 53b128f commit 716c219
Show file tree
Hide file tree
Showing 19 changed files with 884 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemoteStoreIT extends OpenSearchIntegTestCase {

private static final String REPOSITORY_NAME = "test-remore-store-repo";
protected static final String REPOSITORY_NAME = "test-remore-store-repo";
private static final String INDEX_NAME = "remote-store-test-idx-1";
private static final String TOTAL_OPERATIONS = "total-operations";
private static final String REFRESHED_OR_FLUSHED_OPERATIONS = "refreshed-or-flushed-operations";
Expand All @@ -59,7 +59,7 @@ public Settings indexSettings() {
return remoteStoreIndexSettings(0);
}

private Settings remoteStoreIndexSettings(int numberOfReplicas) {
protected Settings remoteStoreIndexSettings(int numberOfReplicas) {
return Settings.builder()
.put(super.indexSettings())
.put("index.refresh_interval", "300s")
Expand Down Expand Up @@ -98,14 +98,22 @@ protected Settings featureFlagSettings() {
public void setup() {
internalCluster().startClusterManagerOnlyNode();
Path absolutePath = randomRepoPath().toAbsolutePath();
putRepository(absolutePath);
}

protected void putRepository(Path path) {
assertAcked(
clusterAdmin().preparePutRepository(REPOSITORY_NAME).setType("fs").setSettings(Settings.builder().put("location", absolutePath))
clusterAdmin().preparePutRepository(REPOSITORY_NAME).setType("fs").setSettings(Settings.builder().put("location", path))
);
}

protected void deleteRepository() {
assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_NAME));
}

@After
public void teardown() {
assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_NAME));
deleteRepository();
}

private IndexResponse indexSingleDoc() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.remotestore.multipart;

import org.opensearch.common.settings.Settings;
import org.opensearch.plugins.Plugin;
import org.opensearch.remotestore.RemoteStoreIT;
import org.opensearch.remotestore.multipart.mocks.MockFsRepositoryPlugin;

import java.nio.file.Path;
import java.util.Collection;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

public class RemoteStoreMultipartIT extends RemoteStoreIT {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Stream.concat(super.nodePlugins().stream(), Stream.of(MockFsRepositoryPlugin.class)).collect(Collectors.toList());
}

@Override
protected void putRepository(Path path) {
assertAcked(
clusterAdmin().preparePutRepository(REPOSITORY_NAME)
.setType(MockFsRepositoryPlugin.TYPE)
.setSettings(Settings.builder().put("location", path))
);
}

@Override
protected void deleteRepository() {
assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_NAME));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.remotestore.multipart.mocks;

import org.opensearch.common.Stream;
import org.opensearch.common.StreamProvider;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.blobstore.fs.FsBlobContainer;
import org.opensearch.common.blobstore.fs.FsBlobStore;
import org.opensearch.common.blobstore.stream.StreamContext;
import org.opensearch.common.blobstore.stream.write.UploadResponse;
import org.opensearch.common.blobstore.stream.write.WriteContext;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

public class MockFsBlobContainer extends FsBlobContainer {

private static final int TRANSFER_TIMEOUT_MILLIS = 30000;

public MockFsBlobContainer(FsBlobStore blobStore, BlobPath blobPath, Path path) {
super(blobStore, blobPath, path);
}

@Override
public boolean isMultiStreamUploadSupported() {
return true;
}

@Override
public CompletableFuture<UploadResponse> writeBlobByStreams(WriteContext writeContext) throws IOException {
CompletableFuture<UploadResponse> completableFuture = new CompletableFuture<>();

int nParts = 10;
long partSize = writeContext.getFileSize() / nParts;
StreamContext streamContext = writeContext.getStreamContext(partSize);
StreamProvider streamProvider = streamContext.getStreamProvider();
final Path file = path.resolve(writeContext.getFileName());
byte[] buffer = new byte[(int) writeContext.getFileSize()];
AtomicLong totalContentRead = new AtomicLong();
CountDownLatch latch = new CountDownLatch(streamContext.getNumberOfParts());
for (int partIdx = 0; partIdx < streamContext.getNumberOfParts(); partIdx++) {
int finalPartIdx = partIdx;
Thread thread = new Thread(() -> {
try {
Stream stream = streamProvider.provideStream(finalPartIdx);
InputStream inputStream = stream.getInputStream();
long remainingContentLength = stream.getContentLength();
long offset = stream.getOffset();
while (remainingContentLength > 0) {
int readContentLength = inputStream.read(buffer, (int) offset, (int) remainingContentLength);
totalContentRead.addAndGet(readContentLength);
remainingContentLength -= readContentLength;
offset += readContentLength;
}
inputStream.close();
} catch (IOException e) {
completableFuture.completeExceptionally(e);
} finally {
latch.countDown();
}
});
thread.start();
}
try {
if (!latch.await(TRANSFER_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
throw new IOException("Timed out waiting for file transfer to complete for " + writeContext.getFileName());
}
} catch (InterruptedException e) {
throw new IOException("Await interrupted on CountDownLatch, transfer failed for " + writeContext.getFileName());
}
try (OutputStream outputStream = Files.newOutputStream(file, StandardOpenOption.CREATE_NEW)) {
outputStream.write(buffer);
}
if (writeContext.getFileSize() != totalContentRead.get()) {
throw new IOException(
"Incorrect content length read for file "
+ writeContext.getFileName()
+ ", actual file size: "
+ writeContext.getFileSize()
+ ", bytes read: "
+ totalContentRead.get()
);
}
completableFuture.complete(new UploadResponse(true));

return completableFuture;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.remotestore.multipart.mocks;

import org.opensearch.OpenSearchException;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.blobstore.fs.FsBlobStore;

import java.io.IOException;
import java.nio.file.Path;

public class MockFsBlobStore extends FsBlobStore {

public MockFsBlobStore(int bufferSizeInBytes, Path path, boolean readonly) throws IOException {
super(bufferSizeInBytes, path, readonly);
}

@Override
public BlobContainer blobContainer(BlobPath path) {
try {
return new MockFsBlobContainer(this, path, buildAndCreate(path));
} catch (IOException ex) {
throw new OpenSearchException("failed to create blob container", ex);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.remotestore.multipart.mocks;

import org.opensearch.cluster.metadata.RepositoryMetadata;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.blobstore.BlobStore;
import org.opensearch.common.blobstore.fs.FsBlobStore;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.env.Environment;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.repositories.fs.FsRepository;

public class MockFsRepository extends FsRepository {

public MockFsRepository(
RepositoryMetadata metadata,
Environment environment,
NamedXContentRegistry namedXContentRegistry,
ClusterService clusterService,
RecoverySettings recoverySettings
) {
super(metadata, environment, namedXContentRegistry, clusterService, recoverySettings);
}

@Override
protected BlobStore createBlobStore() throws Exception {
FsBlobStore fsBlobStore = (FsBlobStore) super.createBlobStore();
return new MockFsBlobStore(fsBlobStore.bufferSizeInBytes(), fsBlobStore.path(), isReadOnly());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.remotestore.multipart.mocks;

import org.opensearch.cluster.service.ClusterService;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.env.Environment;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.plugins.Plugin;
import org.opensearch.plugins.RepositoryPlugin;
import org.opensearch.repositories.Repository;

import java.util.Collections;
import java.util.Map;

public class MockFsRepositoryPlugin extends Plugin implements RepositoryPlugin {

public static final String TYPE = "fs_multipart_repository";

@Override
public Map<String, Repository.Factory> getRepositories(
Environment env,
NamedXContentRegistry namedXContentRegistry,
ClusterService clusterService,
RecoverySettings recoverySettings
) {
return Collections.singletonMap(
"fs_multipart_repository",
metadata -> new MockFsRepository(metadata, env, namedXContentRegistry, clusterService, recoverySettings)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,16 @@

package org.opensearch.common.blobstore;

import org.opensearch.common.blobstore.stream.write.UploadResponse;
import org.opensearch.common.blobstore.stream.write.WriteContext;

import java.io.IOException;
import java.io.InputStream;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.NoSuchFileException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

/**
* An interface for managing a repository of blob entries, where each blob entry is just a named group of bytes.
Expand Down Expand Up @@ -124,6 +128,14 @@ default long readBlobPreferredLength() {
*/
void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException;

default boolean isMultiStreamUploadSupported() {
return false;
}

default CompletableFuture<UploadResponse> writeBlobByStreams(WriteContext writeContext) throws IOException {
throw new UnsupportedOperationException();
}

/**
* Reads blob content from the input stream and writes it to the container in a new blob with the given name,
* using an atomic write operation if the implementation supports it.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.blobstore.stream.write;

/**
* Response object for uploads using <code>BlobContainer#writeBlobByStreams</code>
*/
public class UploadResponse {

private final boolean uploadSuccessful;

/**
* Construct a new UploadResponse object
*
* @param uploadSuccessful Whether the current upload was successful or not
*/
public UploadResponse(boolean uploadSuccessful) {
this.uploadSuccessful = uploadSuccessful;
}

/**
* @return The upload success result
*/
public boolean isUploadSuccessful() {
return uploadSuccessful;
}
}
10 changes: 10 additions & 0 deletions server/src/main/java/org/opensearch/common/util/ByteUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,16 @@ public static void writeLongLE(long l, byte[] arr, int offset) {
assert l == 0;
}

/** Convert long to a byte array in big-endian format */
public static byte[] toByteArrayBE(long l) {
byte[] result = new byte[8];
for (int i = 7; i >= 0; i--) {
result[i] = (byte) (l & 0xffL);
l >>= 8;
}
return result;
}

/** Write a long in little-endian format. */
public static long readLongLE(byte[] arr, int offset) {
long l = arr[offset++] & 0xFFL;
Expand Down
Loading

0 comments on commit 716c219

Please sign in to comment.