Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial commit for POC #136

Closed
wants to merge 17 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,13 @@
import org.opensearch.common.blobstore.BlobStoreException;
import org.opensearch.common.blobstore.DeleteResult;
import org.opensearch.common.blobstore.VerifyingMultiStreamBlobContainer;
import org.opensearch.common.blobstore.stream.read.ReadContext;
import org.opensearch.common.blobstore.stream.write.WriteContext;
import org.opensearch.common.blobstore.stream.write.WritePriority;
import org.opensearch.common.blobstore.support.AbstractBlobContainer;
import org.opensearch.common.blobstore.support.PlainBlobMetadata;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.io.InputStreamContainer;
import org.opensearch.common.unit.ByteSizeUnit;
import org.opensearch.common.unit.ByteSizeValue;
import software.amazon.awssdk.core.exception.SdkException;
Expand All @@ -64,10 +66,13 @@
import software.amazon.awssdk.services.s3.model.Delete;
import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse;
import software.amazon.awssdk.services.s3.model.GetObjectAttributesRequest;
import software.amazon.awssdk.services.s3.model.GetObjectAttributesResponse;
import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
import software.amazon.awssdk.services.s3.model.ObjectAttributes;
import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.S3Error;
Expand Down Expand Up @@ -210,6 +215,93 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> comp
}
}

@Override
public void readBlobAsync(String blobName, ActionListener<ReadContext> listener) {
try (AmazonAsyncS3Reference amazonS3Reference = SocketAccess.doPrivileged(blobStore::asyncClientReference)) {
final S3AsyncClient s3AsyncClient = amazonS3Reference.get().client();
final String bucketName = blobStore.bucket();

// Fetch object part metadata
GetObjectAttributesRequest getObjectAttributesRequest = GetObjectAttributesRequest.builder()
.bucket(bucketName)
.key(blobName)
.objectAttributes(ObjectAttributes.CHECKSUM, ObjectAttributes.OBJECT_SIZE, ObjectAttributes.OBJECT_PARTS)
.build();

GetObjectAttributesResponse blobMetadata = s3AsyncClient.getObjectAttributes(getObjectAttributesRequest).join();

final long blobSize = blobMetadata.objectSize();
final int numParts = blobMetadata.objectParts().totalPartsCount();

final List<CompletableFuture<InputStreamContainer>> blobInputStreamsFuture = new ArrayList<>();
final List<InputStreamContainer> blobPartStreams = new ArrayList<>();

for (int partNumber = 0; partNumber < numParts; partNumber++) {
int finalPartNumber = partNumber;
blobInputStreamsFuture.add(
blobStore.getAsyncTransferManager()
.getPartInputStream(s3AsyncClient, bucketName, blobName, partNumber)
// TODO: Error handling
.whenComplete((data, error) -> blobPartStreams.add(finalPartNumber, data))
);
}

CompletableFuture.allOf(blobInputStreamsFuture.toArray(CompletableFuture[]::new)).whenComplete((data, error) -> {
if (error != null) {
listener.onFailure(new IOException(error));
} else {
listener.onResponse(new ReadContext(blobSize, blobPartStreams, null));
}
});
}
}

//

// @Override
// public CompletableFuture<ReadContext> asyncBlobDownload(String blobName, boolean forceSingleStream) throws IOException {
// try (AmazonAsyncS3Reference amazonS3Reference = SocketAccess.doPrivileged(blobStore::asyncClientReference)) {
// S3AsyncClient s3AsyncClient = amazonS3Reference.get().client();
//
// GetObjectAttributesRequest getObjectAttributesRequest = GetObjectAttributesRequest.builder()
// .bucket(blobStore.bucket())
// .key(blobName)
// .objectAttributes(ObjectAttributes.CHECKSUM, ObjectAttributes.OBJECT_SIZE, ObjectAttributes.OBJECT_PARTS)
// .build();
//
// GetObjectAttributesResponse getObject = s3AsyncClient.getObjectAttributes(getObjectAttributesRequest).join();
//
// final long blobSize = getObject.objectSize();
// final List<CompletableFuture<InputStream>> blobInputStreamsFuture = new ArrayList<>();
// final List<InputStream> blobInputStreams = new ArrayList<>();
// final int numStreams;
//
// if (forceSingleStream) {
// numStreams = 1;
// DownloadRequest downloadRequest = new DownloadRequest(blobStore.bucket(), blobName, blobSize);
// blobInputStreamsFuture.add(blobStore.getAsyncTransferManager().downloadObjectFutureStream(s3AsyncClient, downloadRequest)
// // TODO: Add error handling
// .whenComplete((data, error) -> blobInputStreams.add(data)));
// } else {
// final long optimalStreamSize = 8 * 1024 * 1024; // TODO: Replace this with configurable value
// numStreams = (int) Math.ceil(blobSize * 1.0 / optimalStreamSize);
//
// for (int streamNumber = 0; streamNumber < numStreams; streamNumber++) {
// long start = streamNumber * optimalStreamSize;
// long end = Math.min(blobSize, (streamNumber + 1) * optimalStreamSize) - 1;
// DownloadRequest downloadRequest = new DownloadRequest(blobStore.bucket(), blobName, blobSize, start, end);
// blobInputStreamsFuture.add(blobStore.getAsyncTransferManager().downloadObjectFutureStream(s3AsyncClient, downloadRequest)
// // TODO: Add error handling
// .whenComplete((data, error) -> blobInputStreams.add(data)));
// }
// }
//
// // TODO: Add error handling
// return CompletableFuture.allOf(blobInputStreamsFuture.toArray(new CompletableFuture[0]))
// .thenCompose(ignored -> CompletableFuture.supplyAsync(() -> new ReadContext(blobInputStreams, null, numStreams, blobSize)));
// }
// }

// package private for testing
long getLargeBlobThresholdInBytes() {
return blobStore.bufferSizeInBytes();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,16 @@
import org.opensearch.common.StreamContext;
import org.opensearch.common.blobstore.exception.CorruptFileException;
import org.opensearch.common.blobstore.stream.write.WritePriority;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.io.InputStreamContainer;
import org.opensearch.common.unit.ByteSizeUnit;
import org.opensearch.common.util.ByteUtils;
import org.opensearch.repositories.s3.io.CheckedContainer;
import org.opensearch.repositories.s3.SocketAccess;
import org.opensearch.repositories.s3.utils.HttpRangeUtils;
import software.amazon.awssdk.core.ResponseInputStream;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.http.HttpStatusCode;
import software.amazon.awssdk.services.s3.S3AsyncClient;
Expand All @@ -33,6 +37,8 @@
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse;
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.S3Exception;
import software.amazon.awssdk.utils.CompletableFutureUtils;
Expand Down Expand Up @@ -102,6 +108,29 @@ public CompletableFuture<Void> uploadObject(S3AsyncClient s3AsyncClient, UploadR
return returnFuture;
}

public CompletableFuture<InputStreamContainer> getPartInputStream(
S3AsyncClient s3AsyncClient,
String bucketName,
String blobName,
int partNumber
) {
GetObjectRequest.Builder getObjectRequestBuilder = GetObjectRequest.builder()
.bucket(bucketName)
.key(blobName)
.partNumber(partNumber);

return SocketAccess.doPrivileged(
() -> s3AsyncClient.getObject(getObjectRequestBuilder.build(), AsyncResponseTransformer.toBlockingInputStream())
.thenApply(this::transformResponseToInputStreamContainer)
);
}

private InputStreamContainer transformResponseToInputStreamContainer(ResponseInputStream<GetObjectResponse> streamResponse) {
GetObjectResponse getObjectResponse = streamResponse.response();
final Tuple<Long, Long> s3ResponseRange = HttpRangeUtils.fromHttpRangeHeader(getObjectResponse.contentRange());
return new InputStreamContainer(streamResponse, getObjectResponse.contentLength(), s3ResponseRange.v1());
}

private void uploadInParts(
S3AsyncClient s3AsyncClient,
UploadRequest uploadRequest,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.repositories.s3.async;

/**
* A model encapsulating all details for a download to S3
*/
public class DownloadRequest {
private final String bucket;
private final String key;
private final int partNumber;

public DownloadRequest(String bucket, String key, int partNumber) {
this.bucket = bucket;
this.key = key;
this.partNumber = partNumber;
}

public String getBucket() {
return bucket;
}

public String getKey() {
return key;
}

public int getPartNumber() {
return partNumber;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ private Map<String, Long> indexData(int numberOfIterations, boolean invokeFlush)
}
maxSeqNoRefreshedOrFlushed = maxSeqNo;
refreshedOrFlushedOperations = totalOperations;
int numberOfOperations = randomIntBetween(20, 50);
int numberOfOperations = randomIntBetween(1, 5);
for (int j = 0; j < numberOfOperations; j++) {
IndexResponse response = indexSingleDoc();
maxSeqNo = response.getSeqNo();
Expand All @@ -112,7 +112,7 @@ private void verifyRestoredData(Map<String, Long> indexStats, boolean checkTotal
assertHitCount(client().prepareSearch(INDEX_NAME).setSize(0).get(), indexStats.get(statsGranularity) + 1);
}

private void testRestoreFlow(boolean remoteTranslog, int numberOfIterations, boolean invokeFlush) throws IOException {
protected void testRestoreFlow(boolean remoteTranslog, int numberOfIterations, boolean invokeFlush) throws IOException {
internalCluster().startDataOnlyNodes(3);
if (remoteTranslog) {
createIndex(INDEX_NAME, remoteTranslogIndexSettings(0));
Expand All @@ -129,12 +129,10 @@ private void testRestoreFlow(boolean remoteTranslog, int numberOfIterations, boo

client().admin().cluster().restoreRemoteStore(new RestoreRemoteStoreRequest().indices(INDEX_NAME), PlainActionFuture.newFuture());
ensureGreen(INDEX_NAME);
// assertEquals(indexStats.get(TOTAL_OPERATIONS).longValue(),
// client().prepareSearch(INDEX_NAME).setSize(0).get().getInternalResponse().hits().getTotalHits().value);
verifyRestoredData(indexStats, remoteTranslog);

if (remoteTranslog) {
verifyRestoredData(indexStats, true);
} else {
verifyRestoredData(indexStats, false);
}
}

public void testRemoteSegmentStoreRestoreWithNoDataPostCommit() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.remotestore.RemoteStoreIT;
import org.opensearch.remotestore.multipart.mocks.MockFsRepositoryPlugin;

import java.io.IOException;
import java.nio.file.Path;
import java.util.Collection;
import java.util.stream.Collectors;
Expand All @@ -29,10 +30,16 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {

@Override
protected void putRepository(Path path) {
logger.error("Repo Path: {}", path);
assertAcked(
clusterAdmin().preparePutRepository(REPOSITORY_NAME)
.setType(MockFsRepositoryPlugin.TYPE)
.setSettings(Settings.builder().put("location", path))
);
}

@Override
public void testRemoteSegmentStoreRestoreWithNoDataPostCommit() throws IOException {
testRestoreFlow(true, 2, true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@

package org.opensearch.remotestore.multipart.mocks;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.CorruptIndexException;
import org.opensearch.action.ActionListener;
import org.opensearch.common.blobstore.VerifyingMultiStreamBlobContainer;
import org.opensearch.common.blobstore.stream.read.ReadContext;
import org.opensearch.common.io.InputStreamContainer;
import org.opensearch.common.StreamContext;
import org.opensearch.common.blobstore.BlobPath;
Expand All @@ -24,6 +27,10 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
Expand All @@ -34,14 +41,17 @@ public class MockFsVerifyingBlobContainer extends FsBlobContainer implements Ver

private final boolean triggerDataIntegrityFailure;

private static Set<String> triedBlobs = new HashSet<>();

private static final Logger logger = LogManager.getLogger(MockFsVerifyingBlobContainer.class);

public MockFsVerifyingBlobContainer(FsBlobStore blobStore, BlobPath blobPath, Path path, boolean triggerDataIntegrityFailure) {
super(blobStore, blobPath, path);
this.triggerDataIntegrityFailure = triggerDataIntegrityFailure;
}

@Override
public void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> completionListener) throws IOException {

int nParts = 10;
long partSize = writeContext.getFileSize() / nParts;
StreamContext streamContext = writeContext.getStreamProvider(partSize);
Expand Down Expand Up @@ -114,6 +124,27 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> comp

}

@Override
public void readBlobAsync(String blobName, ActionListener<ReadContext> listener) {
new Thread(() -> {
try {
long contentLength = listBlobs().get(blobName).length();
long partSize = contentLength / 10;
int numberOfParts = (int) ((contentLength % partSize) == 0 ? contentLength / partSize : (contentLength / partSize) + 1);
List<InputStreamContainer> blobPartStreams = new ArrayList<>();
for (int partNumber = 0; partNumber < numberOfParts; partNumber++) {
long offset = partNumber * partSize;
InputStreamContainer blobPartStream = new InputStreamContainer(readBlob(blobName, offset, partSize), partSize, offset);
blobPartStreams.add(blobPartStream);
}
ReadContext blobReadContext = new ReadContext(contentLength, blobPartStreams, null);
listener.onResponse(blobReadContext);
} catch (Exception e) {
listener.onFailure(e);
}
}).start();
}

private boolean isSegmentFile(String filename) {
return !filename.endsWith(".tlog") && !filename.endsWith(".ckp");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,14 @@
package org.opensearch.common.blobstore;

import org.opensearch.action.ActionListener;
import org.opensearch.common.blobstore.stream.read.listener.ReadContextListener;
import org.opensearch.common.blobstore.stream.read.ReadContext;
import org.opensearch.common.blobstore.stream.write.WriteContext;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Path;

/**
* An extension of {@link BlobContainer} that adds {@link VerifyingMultiStreamBlobContainer#asyncBlobUpload} to allow
Expand All @@ -31,4 +36,27 @@ public interface VerifyingMultiStreamBlobContainer extends BlobContainer {
* @throws IOException if any of the input streams could not be read, or the target blob could not be written to
*/
void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> completionListener) throws IOException;

/**
* Creates an async callback of an {@link java.io.InputStream} for the specified blob within the container.
* An {@link IOException} is thrown if requesting the input stream fails.
* @param blobName The name of the blob to get an {@link InputStream} for.
* @param listener Async listener for {@link InputStream} object which serves the input streams and other metadata for the blob
*/
void readBlobAsync(String blobName, ActionListener<ReadContext> listener);

default void asyncBlobDownload(
String blobName,
Path segmentFileLocation,
ThreadPool threadPool,
ActionListener<String> segmentCompletionListener
) {
ReadContextListener readContextListener = new ReadContextListener(
blobName,
segmentFileLocation,
threadPool,
segmentCompletionListener
);
readBlobAsync(blobName, readContextListener);
}
}
Loading