Skip to content

Commit

Permalink
Implementation of RemoteObject for objects to uploaded to remote store
Browse files Browse the repository at this point in the history
Signed-off-by: Sooraj Sinha <soosinha@amazon.com>
  • Loading branch information
soosinha committed May 27, 2024
1 parent 66df930 commit ee52329
Show file tree
Hide file tree
Showing 10 changed files with 1,207 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.gateway.remote;

import static org.opensearch.gateway.remote.RemoteClusterStateAttributesManager.CLUSTER_STATE_ATTRIBUTES_CURRENT_CODEC_VERSION;
import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER;
import static org.opensearch.gateway.remote.RemoteClusterStateUtils.METADATA_NAME_FORMAT;

import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import org.opensearch.cluster.block.ClusterBlocks;
import org.opensearch.common.io.Streams;
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadata;
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadataAttribute;
import org.opensearch.index.remote.RemoteStoreUtils;
import org.opensearch.index.translog.transfer.BlobStoreTransferService;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat;
import org.opensearch.threadpool.ThreadPool;

public class RemoteClusterBlocks extends AbstractRemoteBlobStoreObject<ClusterBlocks> {

public static final String CLUSTER_BLOCKS = "blocks";
public static final ChecksumBlobStoreFormat<ClusterBlocks> CLUSTER_BLOCKS_FORMAT = new ChecksumBlobStoreFormat<>(
"blocks",
METADATA_NAME_FORMAT,
ClusterBlocks::fromXContent
);

private ClusterBlocks clusterBlocks;
private long stateVersion;
private String blobName;
private final String clusterUUID;

public RemoteClusterBlocks(ClusterBlocks clusterBlocks, long stateVersion, String clusterUUID, BlobStoreTransferService blobStoreTransferService,
BlobStoreRepository blobStoreRepository, String clusterName,
ThreadPool threadPool) {
super(blobStoreTransferService, blobStoreRepository, clusterName, threadPool);
this.clusterBlocks = clusterBlocks;
this.stateVersion = stateVersion;
this.clusterUUID = clusterUUID;
}

public RemoteClusterBlocks(String blobName, String clusterUUID, BlobStoreTransferService blobStoreTransferService, BlobStoreRepository blobStoreRepository,
String clusterName,
ThreadPool threadPool) {
super(blobStoreTransferService, blobStoreRepository, clusterName, threadPool);
this.blobName = blobName;
this.clusterUUID = clusterUUID;
}

@Override
public BlobPathParameters getBlobPathParameters() {
return new BlobPathParameters(List.of("transient"), CLUSTER_BLOCKS);
}

@Override
public String getFullBlobName() {
return blobName;
}

@Override
public String generateBlobFileName() {
// 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/transient/<componentPrefix>__<inverted_state_version>__<inverted__timestamp>__<codec_version>
String blobFileName = String.join(
DELIMITER,
getBlobPathParameters().getFilePrefix(),
RemoteStoreUtils.invertLong(stateVersion),
RemoteStoreUtils.invertLong(System.currentTimeMillis()),
String.valueOf(CLUSTER_STATE_ATTRIBUTES_CURRENT_CODEC_VERSION)
);
// setting the full blob path with name for future access
this.blobName = getBlobPathForUpload().buildAsString() + blobFileName;
return blobFileName;
}

@Override
public UploadedMetadata getUploadedMetadata() {
assert blobName != null;
return new UploadedMetadataAttribute(CLUSTER_BLOCKS, blobName);
}

@Override
public ClusterBlocks get() {
return clusterBlocks;
}

@Override
public String clusterUUID() {
return clusterUUID;
}

@Override
public InputStream serialize() throws IOException {
return CLUSTER_BLOCKS_FORMAT.serialize(clusterBlocks, generateBlobFileName(), getCompressor(), RemoteClusterStateUtils.FORMAT_PARAMS).streamInput();
}

@Override
public ClusterBlocks deserialize(InputStream inputStream) throws IOException {
return CLUSTER_BLOCKS_FORMAT.deserialize(blobName, getBlobStoreRepository().getNamedXContentRegistry(), Streams.readFully(inputStream));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.gateway.remote;

import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER;

import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import org.opensearch.common.io.Streams;
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadata;
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadataAttribute;
import org.opensearch.index.remote.RemoteStoreUtils;
import org.opensearch.index.translog.transfer.BlobStoreTransferService;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat;
import org.opensearch.threadpool.ThreadPool;

public class RemoteClusterMetadataManifest extends AbstractRemoteBlobStoreObject<ClusterMetadataManifest> {

public static final String MANIFEST_PATH_TOKEN = "manifest";
public static final int SPLITTED_MANIFEST_FILE_LENGTH = 6;

public static final String MANIFEST_FILE_PREFIX = "manifest";
public static final String METADATA_MANIFEST_NAME_FORMAT = "%s";
public static final int MANIFEST_CURRENT_CODEC_VERSION = ClusterMetadataManifest.CODEC_V3;

/**
* Manifest format compatible with older codec v0, where codec version was missing.
*/
public static final ChecksumBlobStoreFormat<ClusterMetadataManifest> CLUSTER_METADATA_MANIFEST_FORMAT_V0 =
new ChecksumBlobStoreFormat<>("cluster-metadata-manifest", METADATA_MANIFEST_NAME_FORMAT, ClusterMetadataManifest::fromXContentV0);
/**
* Manifest format compatible with older codec v1, where global metadata was missing.
*/
public static final ChecksumBlobStoreFormat<ClusterMetadataManifest> CLUSTER_METADATA_MANIFEST_FORMAT_V1 =
new ChecksumBlobStoreFormat<>("cluster-metadata-manifest", METADATA_MANIFEST_NAME_FORMAT, ClusterMetadataManifest::fromXContentV1);

/**
* Manifest format compatible with codec v2, where we introduced codec versions/global metadata.
*/
public static final ChecksumBlobStoreFormat<ClusterMetadataManifest> CLUSTER_METADATA_MANIFEST_FORMAT = new ChecksumBlobStoreFormat<>(
"cluster-metadata-manifest",
METADATA_MANIFEST_NAME_FORMAT,
ClusterMetadataManifest::fromXContent
);

private ClusterMetadataManifest clusterMetadataManifest;
private String blobName;
private final String clusterUUID;

public RemoteClusterMetadataManifest(ClusterMetadataManifest clusterMetadataManifest, String clusterUUID, BlobStoreTransferService blobStoreTransferService,
BlobStoreRepository blobStoreRepository, String clusterName,
ThreadPool threadPool) {
super(blobStoreTransferService, blobStoreRepository, clusterName, threadPool);
this.clusterMetadataManifest = clusterMetadataManifest;
this.clusterUUID = clusterUUID;
}

public RemoteClusterMetadataManifest(String blobName, String clusterUUID, BlobStoreTransferService blobStoreTransferService,
BlobStoreRepository blobStoreRepository, String clusterName,
ThreadPool threadPool) {
super(blobStoreTransferService, blobStoreRepository, clusterName, threadPool);
this.blobName = blobName;
this.clusterUUID = clusterUUID;
}

@Override
public BlobPathParameters getBlobPathParameters() {
return new BlobPathParameters(List.of(MANIFEST_PATH_TOKEN), MANIFEST_FILE_PREFIX);
}

@Override
public String getFullBlobName() {
return blobName;
}

@Override
public String generateBlobFileName() {
// 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/manifest/manifest__<inverted_term>__<inverted_version>__C/P__<inverted__timestamp>__
// <codec_version>
String blobFileName = String.join(
DELIMITER,
MANIFEST_PATH_TOKEN,
RemoteStoreUtils.invertLong(clusterMetadataManifest.getClusterTerm()),
RemoteStoreUtils.invertLong(clusterMetadataManifest.getStateVersion()),
(clusterMetadataManifest.isCommitted() ? "C" : "P"), // C for committed and P for published
RemoteStoreUtils.invertLong(System.currentTimeMillis()),
String.valueOf(clusterMetadataManifest.getCodecVersion()) // Keep the codec version at last place only, during read we reads last place to
// determine codec version.
);
// setting the full blob path with name for future access
this.blobName = getBlobPathForUpload().buildAsString() + blobFileName;
return blobFileName;
}

@Override
public UploadedMetadata getUploadedMetadata() {
return new UploadedMetadataAttribute(MANIFEST_PATH_TOKEN, blobName);
}

@Override
public ClusterMetadataManifest get() {
return clusterMetadataManifest;
}

@Override
public String clusterUUID() {
return clusterUUID;
}

@Override
public InputStream serialize() throws IOException {
return CLUSTER_METADATA_MANIFEST_FORMAT.serialize(clusterMetadataManifest, generateBlobFileName(), getCompressor(),
RemoteClusterStateUtils.FORMAT_PARAMS).streamInput();
}

@Override
public ClusterMetadataManifest deserialize(InputStream inputStream) throws IOException {
ChecksumBlobStoreFormat<ClusterMetadataManifest> blobStoreFormat = getClusterMetadataManifestBlobStoreFormat();
return blobStoreFormat.deserialize(blobName, getBlobStoreRepository().getNamedXContentRegistry(), Streams.readFully(inputStream));
}

private int getManifestCodecVersion() {
assert blobName != null;
String[] splitName = blobName.split(DELIMITER);
if (splitName.length == SPLITTED_MANIFEST_FILE_LENGTH) {
return Integer.parseInt(splitName[splitName.length - 1]); // Last value would be codec version.
} else if (splitName.length < SPLITTED_MANIFEST_FILE_LENGTH) { // Where codec is not part of file name, i.e. default codec version 0
// is used.
return ClusterMetadataManifest.CODEC_V0;
} else {
throw new IllegalArgumentException("Manifest file name is corrupted");
}
}

private ChecksumBlobStoreFormat<ClusterMetadataManifest> getClusterMetadataManifestBlobStoreFormat() {
long codecVersion = getManifestCodecVersion();
if (codecVersion == MANIFEST_CURRENT_CODEC_VERSION) {
return CLUSTER_METADATA_MANIFEST_FORMAT;
} else if (codecVersion == ClusterMetadataManifest.CODEC_V1) {
return CLUSTER_METADATA_MANIFEST_FORMAT_V1;
} else if (codecVersion == ClusterMetadataManifest.CODEC_V0) {
return CLUSTER_METADATA_MANIFEST_FORMAT_V0;
}
throw new IllegalArgumentException("Cluster metadata manifest file is corrupted, don't have valid codec version");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.gateway.remote;

import java.util.Locale;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.repositories.blobstore.BlobStoreRepository;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Base64;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class RemoteClusterStateUtils {
public static final String METADATA_NAME_FORMAT = "%s.dat";
public static final String METADATA_NAME_PLAIN_FORMAT = "%s";
public static final String METADATA_FILE_PREFIX = "metadata";
public static final String CLUSTER_STATE_PATH_TOKEN = "cluster-state";
public static final String DELIMITER = "__";
public static final String PATH_DELIMITER = "/";

// ToXContent Params with gateway mode.
// We are using gateway context mode to persist all custom metadata.
public static final ToXContent.Params FORMAT_PARAMS = new ToXContent.MapParams(Map.of(Metadata.CONTEXT_MODE_PARAM, Metadata.CONTEXT_MODE_GATEWAY));

public static BlobPath getCusterMetadataBasePath(BlobStoreRepository blobStoreRepository, String clusterName, String clusterUUID) {
return blobStoreRepository.basePath().add(encodeString(clusterName)).add(CLUSTER_STATE_PATH_TOKEN).add(clusterUUID);
}

public static String encodeString(String content) {
return Base64.getUrlEncoder().withoutPadding().encodeToString(content.getBytes(StandardCharsets.UTF_8));
}

public static String getFormattedFileName(String fileName, int codecVersion) {
if (codecVersion < ClusterMetadataManifest.CODEC_V3) {
return String.format(Locale.ROOT, METADATA_NAME_FORMAT, fileName);
}
return fileName;
}

static BlobContainer clusterUUIDContainer(BlobStoreRepository blobStoreRepository, String clusterName) {
return blobStoreRepository.blobStore()
.blobContainer(
blobStoreRepository.basePath()
.add(Base64.getUrlEncoder().withoutPadding().encodeToString(clusterName.getBytes(StandardCharsets.UTF_8)))
.add(CLUSTER_STATE_PATH_TOKEN)
);
}

/**
* Exception for Remote state transfer.
*/
public static class RemoteStateTransferException extends RuntimeException {

public RemoteStateTransferException(String errorDesc) {
super(errorDesc);
}

public RemoteStateTransferException(String errorDesc, Throwable cause) {
super(errorDesc, cause);
}
}

public static class UploadedMetadataResults {
List<ClusterMetadataManifest.UploadedIndexMetadata> uploadedIndexMetadata;
Map<String, ClusterMetadataManifest.UploadedMetadataAttribute> uploadedCustomMetadataMap;
ClusterMetadataManifest.UploadedMetadataAttribute uploadedCoordinationMetadata;
ClusterMetadataManifest.UploadedMetadataAttribute uploadedSettingsMetadata;
ClusterMetadataManifest.UploadedMetadataAttribute uploadedTemplatesMetadata;
ClusterMetadataManifest.UploadedMetadataAttribute uploadedDiscoveryNodes;
ClusterMetadataManifest.UploadedMetadataAttribute uploadedClusterBlocks;
List<ClusterMetadataManifest.UploadedIndexMetadata> uploadedIndicesRoutingMetadata;

public UploadedMetadataResults(
List<ClusterMetadataManifest.UploadedIndexMetadata> uploadedIndexMetadata,
Map<String, ClusterMetadataManifest.UploadedMetadataAttribute> uploadedCustomMetadataMap,
ClusterMetadataManifest.UploadedMetadataAttribute uploadedCoordinationMetadata,
ClusterMetadataManifest.UploadedMetadataAttribute uploadedSettingsMetadata,
ClusterMetadataManifest.UploadedMetadataAttribute uploadedTemplatesMetadata,
ClusterMetadataManifest.UploadedMetadataAttribute uploadedDiscoveryNodes,
ClusterMetadataManifest.UploadedMetadataAttribute uploadedClusterBlocks,
List<ClusterMetadataManifest.UploadedIndexMetadata> uploadedIndicesRoutingMetadata
) {
this.uploadedIndexMetadata = uploadedIndexMetadata;
this.uploadedCustomMetadataMap = uploadedCustomMetadataMap;
this.uploadedCoordinationMetadata = uploadedCoordinationMetadata;
this.uploadedSettingsMetadata = uploadedSettingsMetadata;
this.uploadedTemplatesMetadata = uploadedTemplatesMetadata;
this.uploadedDiscoveryNodes = uploadedDiscoveryNodes;
this.uploadedClusterBlocks = uploadedClusterBlocks;
this.uploadedIndicesRoutingMetadata = uploadedIndicesRoutingMetadata;
}

public UploadedMetadataResults() {
this.uploadedIndexMetadata = new ArrayList<>();
this.uploadedCustomMetadataMap = new HashMap<>();
this.uploadedCoordinationMetadata = null;
this.uploadedSettingsMetadata = null;
this.uploadedTemplatesMetadata = null;
this.uploadedDiscoveryNodes = null;
this.uploadedClusterBlocks = null;
this.uploadedIndicesRoutingMetadata = new ArrayList<>();
}
}
}
Loading

0 comments on commit ee52329

Please sign in to comment.