From f9b0d4d19fedb356a003910de12dea953ca56948 Mon Sep 17 00:00:00 2001 From: Sooraj Sinha Date: Fri, 7 Jun 2024 22:34:51 +0530 Subject: [PATCH] Send manifest file name in remote publish Signed-off-by: Sooraj Sinha --- .../coordination/CoordinationState.java | 5 +- .../cluster/coordination/Coordinator.java | 4 +- .../PublicationTransportHandler.java | 94 +++++++++---------- .../coordination/RemotePublishRequest.java | 17 +++- .../cluster/node/DiscoveryNode.java | 16 ++-- .../common/settings/ClusterSettings.java | 4 +- .../opensearch/discovery/DiscoveryModule.java | 7 +- .../opensearch/gateway/GatewayMetaState.java | 23 +++-- .../remote/RemoteClusterStateService.java | 29 +++--- .../remote/model/RemoteUploadDetails.java | 30 ++++++ .../main/java/org/opensearch/node/Node.java | 3 +- .../remotestore/RemoteStoreNodeAttribute.java | 5 + .../coordination/CoordinationStateTests.java | 4 +- .../cluster/coordination/NodeJoinTests.java | 3 +- .../PublicationTransportHandlerTests.java | 5 +- .../discovery/DiscoveryModuleTests.java | 3 +- .../GatewayMetaStatePersistedStateTests.java | 11 ++- .../AbstractCoordinatorTestCase.java | 3 +- 18 files changed, 166 insertions(+), 100 deletions(-) create mode 100644 server/src/main/java/org/opensearch/gateway/remote/model/RemoteUploadDetails.java diff --git a/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java b/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java index c0ed49c0bc0c2..f20ea13e7668a 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java @@ -50,6 +50,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import org.opensearch.node.remotestore.RemoteStoreNodeAttribute; import static org.opensearch.cluster.coordination.Coordinator.ZEN1_BWC_TERM; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled; @@ -79,6 +80,7 @@ public class CoordinationState { private long lastPublishedVersion; private VotingConfiguration lastPublishedConfiguration; private VoteCollection publishVotes; + private final boolean isRemoteStateEnabled; private final boolean isRemotePublicationEnabled; public CoordinationState( @@ -102,7 +104,8 @@ public CoordinationState( .getLastAcceptedState() .getLastAcceptedConfiguration(); this.publishVotes = new VoteCollection(); - this.isRemotePublicationEnabled = isRemoteStoreClusterStateEnabled(settings) && isRemoteRoutingTableEnabled(settings); + this.isRemoteStateEnabled = isRemoteStoreClusterStateEnabled(settings); + this.isRemotePublicationEnabled = RemoteStoreNodeAttribute.isRemotePublicationEnabled(settings); } public boolean isRemotePublicationEnabled() { diff --git a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java index 8b0da49e93fb0..914db31886850 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java @@ -49,6 +49,7 @@ import org.opensearch.cluster.coordination.CoordinationState.VoteCollection; import org.opensearch.cluster.coordination.FollowersChecker.FollowerCheckRequest; import org.opensearch.cluster.coordination.JoinHelper.InitialJoinAccumulator; +import org.opensearch.cluster.coordination.PersistedStateRegistry.PersistedStateType; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; @@ -1334,7 +1335,8 @@ assert getLocalNode().equals(clusterState.getNodes().get(getLocalNode().getId()) final PublicationTransportHandler.PublicationContext publicationContext = publicationHandler.newPublicationContext( clusterChangedEvent, - coordinationState.get().isRemotePublicationEnabled() + coordinationState.get().isRemotePublicationEnabled(), + persistedStateRegistry ); final PublishRequest publishRequest = coordinationState.get().handleClientValue(clusterState); diff --git a/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java b/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java index 79a7f32ce2ca0..0aea421ca1dc3 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java @@ -31,6 +31,9 @@ package org.opensearch.cluster.coordination; +import java.util.Locale; +import java.util.Optional; +import java.util.function.Supplier; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; @@ -40,6 +43,8 @@ import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.Diff; import org.opensearch.cluster.IncompatibleClusterStateVersionException; +import org.opensearch.cluster.coordination.CoordinationState.PersistedState; +import org.opensearch.cluster.coordination.PersistedStateRegistry.PersistedStateType; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.core.action.ActionListener; @@ -47,6 +52,7 @@ import org.opensearch.core.common.io.stream.NamedWriteableRegistry; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.transport.TransportResponse; +import org.opensearch.gateway.GatewayMetaState.RemotePersistedState; import org.opensearch.gateway.remote.ClusterMetadataManifest; import org.opensearch.gateway.remote.RemoteClusterStateService; import org.opensearch.threadpool.ThreadPool; @@ -229,50 +235,35 @@ private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportReque } private PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest request) throws IOException { - final Optional manifestOptional = remoteClusterStateService.getClusterMetadataManifestByTermVersion( - request.getClusterName(), - request.getClusterUUID(), - request.term, - request.version - ); - if (manifestOptional.isPresent() == false) { - throw new IllegalStateException( - String.format(Locale.ROOT, "Manifest is not present for term - %s version - %s", request.term, request.version) - ); + if (transportService.getLocalNode().equals(request.getSourceNode())) { + return acceptStateOnLocalNode(request); } - ClusterMetadataManifest manifest = manifestOptional.get(); + ClusterMetadataManifest manifest = remoteClusterStateService.getClusterMetadataManifestByFileName(request.getClusterUUID(), request.getManifestFile()); boolean applyFullState = false; final ClusterState lastSeen = lastSeenClusterState.get(); if (lastSeen == null) { - logger.debug("Diff cannot be applied as there is no last cluster state"); + logger.debug(() -> "Diff cannot be applied as there is no last cluster state"); applyFullState = true; } else if (manifest.getDiffManifest() == null) { - logger.debug("There is no diff in the manifest"); + logger.trace(() -> "There is no diff in the manifest"); applyFullState = true; } else if (manifest.getDiffManifest().getFromStateUUID().equals(lastSeen.stateUUID()) == false) { - logger.debug("Last cluster state not compatible with the diff"); + logger.debug(() -> "Last cluster state not compatible with the diff"); applyFullState = true; } if (applyFullState == true) { - ClusterState clusterState = remoteClusterStateService.getClusterStateForManifest( - request.getClusterName(), - manifest, - transportService.getLocalNode().getId() - ); - logger.debug("Downloaded full cluster state [{}]", clusterState); + logger.debug(() -> new ParameterizedMessage("Downloading full cluster state for term {}, version {}, stateUUID {}", manifest.getClusterTerm(), manifest.getStateVersion(), + manifest.getStateUUID())); + ClusterState clusterState = remoteClusterStateService.getClusterStateForManifest(request.getClusterName(), manifest, transportService.getLocalNode().getId(), true); fullClusterStateReceivedCount.incrementAndGet(); final PublishWithJoinResponse response = acceptState(clusterState); lastSeenClusterState.set(clusterState); return response; } else { - ClusterState clusterState = remoteClusterStateService.getClusterStateUsingDiff( - request.getClusterName(), - manifest, - lastSeenClusterState.get(), - transportService.getLocalNode().getId() - ); - logger.debug("Downloaded full cluster state from diff [{}]", clusterState); + logger.debug(() -> new ParameterizedMessage("Downloading diff cluster state for term {}, version {}, previousUUID {}, current UUID {}", manifest.getClusterTerm(), + manifest.getStateVersion(), manifest.getDiffManifest().getFromStateUUID(), manifest.getStateUUID())); + ClusterState clusterState = remoteClusterStateService.getClusterStateUsingDiff(request.getClusterName(), manifest, lastSeen, transportService.getLocalNode().getId()); compatibleClusterStateDiffReceivedCount.incrementAndGet(); final PublishWithJoinResponse response = acceptState(clusterState); lastSeenClusterState.compareAndSet(lastSeen, clusterState); @@ -293,8 +284,20 @@ private PublishWithJoinResponse acceptState(ClusterState incomingState) { return handlePublishRequest.apply(new PublishRequest(incomingState)); } - public PublicationContext newPublicationContext(ClusterChangedEvent clusterChangedEvent, boolean isRemotePublicationEnabled) { - final PublicationContext publicationContext = new PublicationContext(clusterChangedEvent, isRemotePublicationEnabled); + private PublishWithJoinResponse acceptStateOnLocalNode(RemotePublishRequest remotePublishRequest) { + final PublishRequest publishRequest = currentPublishRequestToSelf.get(); + if (publishRequest == null || publishRequest.getAcceptedState().coordinationMetadata().term() != remotePublishRequest.term + || publishRequest.getAcceptedState().version() != remotePublishRequest.version) { + throw new IllegalStateException("publication to self failed for " + remotePublishRequest); + } + PublishWithJoinResponse publishWithJoinResponse = handlePublishRequest.apply(publishRequest); + lastSeenClusterState.set(publishRequest.getAcceptedState()); + return publishWithJoinResponse; + } + + public PublicationContext newPublicationContext(ClusterChangedEvent clusterChangedEvent, boolean isRemotePublicationEnabled, + PersistedStateRegistry persistedStateRegistry) { + final PublicationContext publicationContext = new PublicationContext(clusterChangedEvent, isRemotePublicationEnabled, persistedStateRegistry); // Build the serializations we expect to need now, early in the process, so that an error during serialization fails the publication // straight away. This isn't watertight since we send diffs on a best-effort basis and may fall back to sending a full state (and @@ -340,13 +343,15 @@ public class PublicationContext { private final Map serializedStates = new HashMap<>(); private final Map serializedDiffs = new HashMap<>(); private final boolean sendRemoteState; + private final PersistedStateRegistry persistedStateRegistry; - PublicationContext(ClusterChangedEvent clusterChangedEvent, boolean isRemotePublicationEnabled) { + PublicationContext(ClusterChangedEvent clusterChangedEvent, boolean isRemotePublicationEnabled, PersistedStateRegistry persistedStateRegistry) { discoveryNodes = clusterChangedEvent.state().nodes(); newState = clusterChangedEvent.state(); previousState = clusterChangedEvent.previousState(); sendFullVersion = previousState.getBlocks().disableStatePersistence(); sendRemoteState = isRemotePublicationEnabled; + this.persistedStateRegistry = persistedStateRegistry; } void buildDiffAndSerializeStates() { @@ -410,7 +415,7 @@ public void onFailure(Exception e) { } else { responseActionListener = listener; } - if (sendRemoteState && destination.isRemoteStateNode()) { + if (sendRemoteState && destination.isRemoteClusterStateEnabled() && destination.isRemoteRoutingTableEnabled()) { sendRemoteClusterState(destination, publishRequest.getAcceptedState(), responseActionListener); } else if (sendFullVersion || previousState.nodes().nodeExists(destination) == false) { logger.trace("sending full cluster state version [{}] to [{}]", newState.version(), destination); @@ -457,25 +462,16 @@ public String executor() { ); } - private void sendRemoteClusterState( - DiscoveryNode destination, - ClusterState clusterState, - ActionListener listener - ) { + private void sendRemoteClusterState(final DiscoveryNode destination, final ClusterState clusterState, final ActionListener listener) { try { - final RemotePublishRequest remotePublishRequest = new RemotePublishRequest( - discoveryNodes.getLocalNode(), - clusterState.term(), - clusterState.getVersion(), - clusterState.getClusterName().value(), - clusterState.metadata().clusterUUID() - ); + final String manifestFileName = ((RemotePersistedState) persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE)).getLastUploadedManifestFile(); + final RemotePublishRequest remotePublishRequest = new RemotePublishRequest(discoveryNodes.getLocalNode(), clusterState.term(), + clusterState.getVersion(), clusterState.getClusterName().value(), clusterState.metadata().clusterUUID(), manifestFileName); final Consumer transportExceptionHandler = exp -> { logger.debug(() -> new ParameterizedMessage("failed to send remote cluster state to {}", destination), exp); listener.onFailure(exp); }; - final TransportResponseHandler responseHandler = new TransportResponseHandler< - PublishWithJoinResponse>() { + final TransportResponseHandler responseHandler = new TransportResponseHandler<>() { @Override public PublishWithJoinResponse read(StreamInput in) throws IOException { @@ -497,13 +493,7 @@ public String executor() { return ThreadPool.Names.GENERIC; } }; - transportService.sendRequest( - destination, - PUBLISH_REMOTE_STATE_ACTION_NAME, - remotePublishRequest, - stateRequestOptions, - responseHandler - ); + transportService.sendRequest(destination, PUBLISH_REMOTE_STATE_ACTION_NAME, remotePublishRequest, stateRequestOptions, responseHandler); } catch (Exception e) { logger.warn(() -> new ParameterizedMessage("error sending remote cluster state to {}", destination), e); listener.onFailure(e); diff --git a/server/src/main/java/org/opensearch/cluster/coordination/RemotePublishRequest.java b/server/src/main/java/org/opensearch/cluster/coordination/RemotePublishRequest.java index dc770a30a1990..68dea53372c45 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/RemotePublishRequest.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/RemotePublishRequest.java @@ -16,20 +16,22 @@ public class RemotePublishRequest extends TermVersionRequest { - // todo Do we need cluster name and UUID ? private final String clusterName; private final String clusterUUID; + private final String manifestFile; - public RemotePublishRequest(DiscoveryNode sourceNode, long term, long version, String clusterName, String clusterUUID) { + public RemotePublishRequest(DiscoveryNode sourceNode, long term, long version, String clusterName, String clusterUUID, String manifestFile) { super(sourceNode, term, version); this.clusterName = clusterName; this.clusterUUID = clusterUUID; + this.manifestFile = manifestFile; } public RemotePublishRequest(StreamInput in) throws IOException { super(in); this.clusterName = in.readString(); this.clusterUUID = in.readString(); + this.manifestFile = in.readString(); } @Override @@ -37,6 +39,13 @@ public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeString(clusterName); out.writeString(clusterUUID); + out.writeString(manifestFile); + } + + @Override + public String toString() { + return "RemotePublishRequest{" + "term=" + term + ", version=" + version + ", clusterName=" + clusterName + ", clusterUUID=" + clusterUUID + + ", sourceNode=" + sourceNode + ", manifestFile=" + manifestFile + '}'; } public String getClusterName() { @@ -46,4 +55,8 @@ public String getClusterName() { public String getClusterUUID() { return clusterUUID; } + + public String getManifestFile() { + return manifestFile; + } } diff --git a/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java b/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java index 5ed5294687c40..6cf1f654e3939 100644 --- a/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java +++ b/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java @@ -33,6 +33,7 @@ package org.opensearch.cluster.node; import org.opensearch.Version; +import org.opensearch.cluster.metadata.Metadata; import org.opensearch.common.UUIDs; import org.opensearch.common.annotation.PublicApi; import org.opensearch.common.settings.Setting; @@ -43,6 +44,7 @@ import org.opensearch.core.common.transport.TransportAddress; import org.opensearch.core.xcontent.ToXContentFragment; import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.core.xcontent.XContentParser; import org.opensearch.node.Node; import java.io.IOException; @@ -474,20 +476,20 @@ public boolean isRemoteStoreNode() { /** * Returns whether the node is a remote cluster state enabled node. - * @return true if the node contains remote cluster state and remote routing table node attributes, false otherwise + * @return true if the node contains remote cluster state node attribute, false otherwise */ - public boolean isRemoteStateNode() { - return isRemoteClusterStateEnabled() && isRemoteRoutingTableEnabled(); - } - - private boolean isRemoteClusterStateEnabled() { + public boolean isRemoteClusterStateEnabled() { return this.getAttributes() .keySet() .stream() .anyMatch(key -> (key.equals(REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY))); } - private boolean isRemoteRoutingTableEnabled() { + /** + * Returns whether remote routing table is enabled on the node + * @return true if the node contains remote routing table node attributes, false otherwise + */ + public boolean isRemoteRoutingTableEnabled() { return this.getAttributes().keySet().stream().anyMatch(key -> key.equals(REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY)); } diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 09f32884e0ae1..3ba6cebbe7350 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -77,6 +77,7 @@ import org.opensearch.cluster.routing.allocation.decider.SameShardAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider; +import org.opensearch.cluster.routing.remote.RemoteRoutingTableService; import org.opensearch.cluster.service.ClusterApplierService; import org.opensearch.cluster.service.ClusterManagerService; import org.opensearch.cluster.service.ClusterManagerTaskThrottler; @@ -715,9 +716,6 @@ public void apply(Settings value, Settings current, Settings previous) { // Remote cluster state settings RemoteClusterStateCleanupManager.REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING, RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING, - RemoteClusterStateService.INDEX_METADATA_UPLOAD_TIMEOUT_SETTING, - RemoteClusterStateService.GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING, - RemoteClusterStateService.METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING, RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING, RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING, IndicesService.CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING, diff --git a/server/src/main/java/org/opensearch/discovery/DiscoveryModule.java b/server/src/main/java/org/opensearch/discovery/DiscoveryModule.java index 538dea5b2e60b..922e23b849d49 100644 --- a/server/src/main/java/org/opensearch/discovery/DiscoveryModule.java +++ b/server/src/main/java/org/opensearch/discovery/DiscoveryModule.java @@ -53,6 +53,7 @@ import org.opensearch.core.common.io.stream.NamedWriteableRegistry; import org.opensearch.core.common.transport.TransportAddress; import org.opensearch.gateway.GatewayMetaState; +import org.opensearch.gateway.remote.RemoteClusterStateService; import org.opensearch.monitor.NodeHealthService; import org.opensearch.node.remotestore.RemoteStoreNodeService; import org.opensearch.plugins.DiscoveryPlugin; @@ -135,7 +136,8 @@ public DiscoveryModule( NodeHealthService nodeHealthService, PersistedStateRegistry persistedStateRegistry, RemoteStoreNodeService remoteStoreNodeService, - ClusterManagerMetrics clusterManagerMetrics + ClusterManagerMetrics clusterManagerMetrics, + RemoteClusterStateService remoteClusterStateService ) { final Collection> joinValidators = new ArrayList<>(); final Map> hostProviders = new HashMap<>(); @@ -214,7 +216,8 @@ public DiscoveryModule( nodeHealthService, persistedStateRegistry, remoteStoreNodeService, - clusterManagerMetrics + clusterManagerMetrics, + remoteClusterStateService ); } else { throw new IllegalArgumentException("Unknown discovery type [" + discoveryType + "]"); diff --git a/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java b/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java index c3056276706a0..1cc0c9cbb6642 100644 --- a/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java +++ b/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java @@ -64,6 +64,7 @@ import org.opensearch.env.NodeMetadata; import org.opensearch.gateway.remote.ClusterMetadataManifest; import org.opensearch.gateway.remote.RemoteClusterStateService; +import org.opensearch.gateway.remote.model.RemoteUploadDetails; import org.opensearch.index.recovery.RemoteStoreRestoreService; import org.opensearch.index.recovery.RemoteStoreRestoreService.RemoteRestoreResult; import org.opensearch.node.Node; @@ -665,6 +666,8 @@ public static class RemotePersistedState implements PersistedState { private ClusterState lastAcceptedState; private ClusterMetadataManifest lastAcceptedManifest; + + private String lastUploadedManifestFile; private final RemoteClusterStateService remoteClusterStateService; private String previousClusterUUID; @@ -690,10 +693,14 @@ public void setCurrentTerm(long currentTerm) { // But for RemotePersistedState, the state is only pushed by the active cluster. So this method is not required. } + public String getLastUploadedManifestFile() { + return lastUploadedManifestFile; + } + @Override public void setLastAcceptedState(ClusterState clusterState) { try { - final ClusterMetadataManifest manifest; + final RemoteUploadDetails manifestDetails; if (shouldWriteFullClusterState(clusterState)) { final Optional latestManifest = remoteClusterStateService.getLatestClusterMetadataManifest( clusterState.getClusterName().value(), @@ -711,15 +718,16 @@ public void setLastAcceptedState(ClusterState clusterState) { clusterState.metadata().clusterUUID() ); } - manifest = remoteClusterStateService.writeFullMetadata(clusterState, previousClusterUUID); + manifestDetails = remoteClusterStateService.writeFullMetadata(clusterState, previousClusterUUID); } else { assert verifyManifestAndClusterState(lastAcceptedManifest, lastAcceptedState) == true : "Previous manifest and previous ClusterState are not in sync"; - manifest = remoteClusterStateService.writeIncrementalMetadata(lastAcceptedState, clusterState, lastAcceptedManifest); + manifestDetails = remoteClusterStateService.writeIncrementalMetadata(lastAcceptedState, clusterState, lastAcceptedManifest); } - assert verifyManifestAndClusterState(manifest, clusterState) == true : "Manifest and ClusterState are not in sync"; - lastAcceptedManifest = manifest; + assert verifyManifestAndClusterState(manifestDetails.getClusterMetadataManifest(), clusterState) == true : "Manifest and ClusterState are not in sync"; + lastAcceptedManifest = manifestDetails.getClusterMetadataManifest(); lastAcceptedState = clusterState; + lastUploadedManifestFile = manifestDetails.getManifestFileName(); } catch (Exception e) { remoteClusterStateService.writeMetadataFailed(); handleExceptionOnWrite(e); @@ -767,12 +775,13 @@ public void markLastAcceptedStateAsCommitted() { metadataBuilder.clusterUUIDCommitted(true); clusterState = ClusterState.builder(lastAcceptedState).metadata(metadataBuilder).build(); } - final ClusterMetadataManifest committedManifest = remoteClusterStateService.markLastStateAsCommitted( + final RemoteUploadDetails committedManifestDetails = remoteClusterStateService.markLastStateAsCommitted( clusterState, lastAcceptedManifest ); - lastAcceptedManifest = committedManifest; + lastAcceptedManifest = committedManifestDetails.getClusterMetadataManifest(); lastAcceptedState = clusterState; + lastUploadedManifestFile = committedManifestDetails.getManifestFileName(); } catch (Exception e) { handleExceptionOnWrite(e); } diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index d0593dcd51475..798553b72fef0 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -36,6 +36,7 @@ import org.opensearch.core.xcontent.ToXContent; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadataAttribute; +import org.opensearch.gateway.remote.model.RemoteUploadDetails; import org.opensearch.index.remote.RemoteStoreUtils; import org.opensearch.index.translog.transfer.BlobStoreTransferService; import org.opensearch.node.Node; @@ -268,7 +269,7 @@ public RemoteClusterStateService( * @return A manifest object which contains the details of uploaded entity metadata. */ @Nullable - public ClusterMetadataManifest writeFullMetadata(ClusterState clusterState, String previousClusterUUID) throws IOException { + public RemoteUploadDetails writeFullMetadata(ClusterState clusterState, String previousClusterUUID) throws IOException { final long startTimeNanos = relativeTimeNanosSupplier.getAsLong(); if (clusterState.nodes().isLocalNodeElectedClusterManager() == false) { logger.error("Local node is not elected cluster manager. Exiting"); @@ -284,7 +285,7 @@ public ClusterMetadataManifest writeFullMetadata(ClusterState clusterState, Stri true, true ); - final ClusterMetadataManifest manifest = uploadManifest( + final RemoteUploadDetails manifestDetails = uploadManifest( clusterState, uploadedMetadataResults.uploadedIndexMetadata, previousClusterUUID, @@ -311,7 +312,7 @@ public ClusterMetadataManifest writeFullMetadata(ClusterState clusterState, Stri uploadedMetadataResults.uploadedIndexMetadata.size() ); } - return manifest; + return manifestDetails; } /** @@ -322,7 +323,7 @@ public ClusterMetadataManifest writeFullMetadata(ClusterState clusterState, Stri * @return The uploaded ClusterMetadataManifest file */ @Nullable - public ClusterMetadataManifest writeIncrementalMetadata( + public RemoteUploadDetails writeIncrementalMetadata( ClusterState previousClusterState, ClusterState clusterState, ClusterMetadataManifest previousManifest @@ -404,7 +405,7 @@ public ClusterMetadataManifest writeIncrementalMetadata( customsToBeDeletedFromRemote.keySet().forEach(allUploadedCustomMap::remove); indicesToBeDeletedFromRemote.keySet().forEach(allUploadedIndexMetadata::remove); - final ClusterMetadataManifest manifest = uploadManifest( + final RemoteUploadDetails manifestDetails = uploadManifest( clusterState, new ArrayList<>(allUploadedIndexMetadata.values()), previousManifest.getPreviousClusterUUID(), @@ -422,7 +423,7 @@ public ClusterMetadataManifest writeIncrementalMetadata( remoteStateStats.stateTook(durationMillis); ParameterizedMessage clusterStateUploadTimeMessage = new ParameterizedMessage( CLUSTER_STATE_UPLOAD_TIME_LOG_STRING, - manifest.getStateVersion(), + manifestDetails.getClusterMetadataManifest().getStateVersion(), durationMillis ); ParameterizedMessage metadataUpdateMessage = new ParameterizedMessage( @@ -444,7 +445,7 @@ public ClusterMetadataManifest writeIncrementalMetadata( } else { logger.info("{}; {}", clusterStateUploadTimeMessage, metadataUpdateMessage); } - return manifest; + return manifestDetails; } private UploadedMetadataResults writeMetadataInParallel( @@ -724,7 +725,7 @@ public RemoteClusterStateCleanupManager getCleanupManager() { } @Nullable - public ClusterMetadataManifest markLastStateAsCommitted(ClusterState clusterState, ClusterMetadataManifest previousManifest) + public RemoteUploadDetails markLastStateAsCommitted(ClusterState clusterState, ClusterMetadataManifest previousManifest) throws IOException { assert clusterState != null : "Last accepted cluster state is not set"; if (clusterState.nodes().isLocalNodeElectedClusterManager() == false) { @@ -732,7 +733,7 @@ public ClusterMetadataManifest markLastStateAsCommitted(ClusterState clusterStat return null; } assert previousManifest != null : "Last cluster metadata manifest is not set"; - ClusterMetadataManifest committedManifest = uploadManifest( + RemoteUploadDetails committedManifestDetails = uploadManifest( clusterState, previousManifest.getIndices(), previousManifest.getPreviousClusterUUID(), @@ -742,11 +743,11 @@ public ClusterMetadataManifest markLastStateAsCommitted(ClusterState clusterStat previousManifest.getCustomMetadataMap(), true ); - if (!previousManifest.isClusterUUIDCommitted() && committedManifest.isClusterUUIDCommitted()) { - remoteClusterStateCleanupManager.deleteStaleClusterUUIDs(clusterState, committedManifest); + if (!previousManifest.isClusterUUIDCommitted() && committedManifestDetails.getClusterMetadataManifest().isClusterUUIDCommitted()) { + remoteClusterStateCleanupManager.deleteStaleClusterUUIDs(clusterState, committedManifestDetails.getClusterMetadataManifest()); } - return committedManifest; + return committedManifestDetails; } @Override @@ -773,7 +774,7 @@ public void start() { this.remoteRoutingTableService.ifPresent(RemoteRoutingTableService::start); } - private ClusterMetadataManifest uploadManifest( + private RemoteUploadDetails uploadManifest( ClusterState clusterState, List uploadedIndexMetadata, String previousClusterUUID, @@ -812,7 +813,7 @@ private ClusterMetadataManifest uploadManifest( new ArrayList<>() ); writeMetadataManifest(clusterState.getClusterName().value(), clusterState.metadata().clusterUUID(), manifest, manifestFileName); - return manifest; + return new RemoteUploadDetails(manifest, manifestFileName); } } diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteUploadDetails.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteUploadDetails.java new file mode 100644 index 0000000000000..100280c454e43 --- /dev/null +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteUploadDetails.java @@ -0,0 +1,30 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote.model; + +import org.opensearch.gateway.remote.ClusterMetadataManifest; + +public class RemoteUploadDetails { + + private final ClusterMetadataManifest clusterMetadataManifest; + private final String manifestFileName; + + public RemoteUploadDetails(final ClusterMetadataManifest manifest, final String manifestFileName) { + this.clusterMetadataManifest = manifest; + this.manifestFileName = manifestFileName; + } + + public ClusterMetadataManifest getClusterMetadataManifest() { + return clusterMetadataManifest; + } + + public String getManifestFileName() { + return manifestFileName; + } +} diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index cb1f2caa082fc..e193a988f3d48 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -1200,7 +1200,8 @@ protected Node( fsHealthService, persistedStateRegistry, remoteStoreNodeService, - clusterManagerMetrics + clusterManagerMetrics, + remoteClusterStateService ); final SearchPipelineService searchPipelineService = new SearchPipelineService( clusterService, diff --git a/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeAttribute.java b/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeAttribute.java index a0f745a4270c4..05714b4791d06 100644 --- a/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeAttribute.java +++ b/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeAttribute.java @@ -201,6 +201,11 @@ private static boolean isRemoteRoutingTableAttributePresent(Settings settings) { .isEmpty() == false; } + public static boolean isRemotePublicationEnabled(Settings settings) { + return FeatureFlags.isEnabled(REMOTE_PUBLICATION_EXPERIMENTAL) && isRemoteRoutingTableAttributePresent(settings) + && isRemoteStoreClusterStateEnabled(settings); + } + public static boolean isRemoteRoutingTableEnabled(Settings settings) { return FeatureFlags.isEnabled(REMOTE_PUBLICATION_EXPERIMENTAL) && isRemoteRoutingTableAttributePresent(settings); } diff --git a/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java b/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java index bd71aecf89101..4ade19b3d7219 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java @@ -48,6 +48,7 @@ import org.opensearch.gateway.GatewayMetaState.RemotePersistedState; import org.opensearch.gateway.remote.ClusterMetadataManifest; import org.opensearch.gateway.remote.RemoteClusterStateService; +import org.opensearch.gateway.remote.model.RemoteUploadDetails; import org.opensearch.repositories.fs.FsRepository; import org.opensearch.test.EqualsHashCodeTestUtils; import org.opensearch.test.OpenSearchTestCase; @@ -944,7 +945,8 @@ public void testHandlePrePublishAndCommitWhenRemoteStateEnabled() throws IOExcep .previousClusterUUID(randomAlphaOfLength(10)) .clusterUUIDCommitted(true) .build(); - Mockito.when(remoteClusterStateService.writeFullMetadata(clusterState, previousClusterUUID)).thenReturn(manifest); + Mockito.when(remoteClusterStateService.writeFullMetadata(clusterState, previousClusterUUID)) + .thenReturn(new RemoteUploadDetails(manifest, "path/to/manifest")); final PersistedStateRegistry persistedStateRegistry = persistedStateRegistry(); persistedStateRegistry.addPersistedState(PersistedStateType.LOCAL, ps1); diff --git a/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java b/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java index f84f0326f4a9d..3a7988bcd2bda 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java @@ -274,7 +274,8 @@ protected void onSendRequest( nodeHealthService, persistedStateRegistry, Mockito.mock(RemoteStoreNodeService.class), - new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE) + new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE), + null ); transportService.start(); transportService.acceptIncomingRequests(); diff --git a/server/src/test/java/org/opensearch/cluster/coordination/PublicationTransportHandlerTests.java b/server/src/test/java/org/opensearch/cluster/coordination/PublicationTransportHandlerTests.java index 6d94054afdea2..a9950d82bfac8 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/PublicationTransportHandlerTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/PublicationTransportHandlerTests.java @@ -76,7 +76,8 @@ public void testDiffSerializationFailure() { transportService, writableRegistry(), pu -> null, - (pu, l) -> {} + (pu, l) -> {}, + null ); transportService.start(); transportService.acceptIncomingRequests(); @@ -111,7 +112,7 @@ public void writeTo(StreamOutput out) throws IOException { OpenSearchException e = expectThrows( OpenSearchException.class, - () -> handler.newPublicationContext(new ClusterChangedEvent("test", unserializableClusterState, clusterState)) + () -> handler.newPublicationContext(new ClusterChangedEvent("test", unserializableClusterState, clusterState), false, null) ); assertNotNull(e.getCause()); assertThat(e.getCause(), instanceOf(IOException.class)); diff --git a/server/src/test/java/org/opensearch/discovery/DiscoveryModuleTests.java b/server/src/test/java/org/opensearch/discovery/DiscoveryModuleTests.java index 5539b3237c2bf..cd3e8af2a3cd1 100644 --- a/server/src/test/java/org/opensearch/discovery/DiscoveryModuleTests.java +++ b/server/src/test/java/org/opensearch/discovery/DiscoveryModuleTests.java @@ -131,7 +131,8 @@ private DiscoveryModule newModule(Settings settings, List plugi null, new PersistedStateRegistry(), remoteStoreNodeService, - new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE) + new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE), + null ); } diff --git a/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java b/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java index 418e6d8de6adb..1d63caafbb4dd 100644 --- a/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java +++ b/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java @@ -69,6 +69,7 @@ import org.opensearch.gateway.remote.ClusterMetadataManifest; import org.opensearch.gateway.remote.RemoteClusterStateService; import org.opensearch.gateway.remote.RemotePersistenceStats; +import org.opensearch.gateway.remote.model.RemoteUploadDetails; import org.opensearch.index.recovery.RemoteStoreRestoreService; import org.opensearch.index.recovery.RemoteStoreRestoreService.RemoteRestoreResult; import org.opensearch.index.remote.RemoteIndexPathUploader; @@ -723,9 +724,10 @@ public void testRemotePersistedState() throws IOException { final RemoteClusterStateService remoteClusterStateService = Mockito.mock(RemoteClusterStateService.class); final ClusterMetadataManifest manifest = ClusterMetadataManifest.builder().clusterTerm(1L).stateVersion(5L).build(); final String previousClusterUUID = "prev-cluster-uuid"; - Mockito.when(remoteClusterStateService.writeFullMetadata(Mockito.any(), Mockito.any())).thenReturn(manifest); + Mockito.when(remoteClusterStateService.writeFullMetadata(Mockito.any(), Mockito.any())).thenReturn(new RemoteUploadDetails(manifest, "path/to/manifest")); - Mockito.when(remoteClusterStateService.writeIncrementalMetadata(Mockito.any(), Mockito.any(), Mockito.any())).thenReturn(manifest); + Mockito.when(remoteClusterStateService.writeIncrementalMetadata(Mockito.any(), Mockito.any(), Mockito.any())) + .thenReturn(new RemoteUploadDetails(manifest, "path/to/manifest")); CoordinationState.PersistedState remotePersistedState = new RemotePersistedState(remoteClusterStateService, previousClusterUUID); assertThat(remotePersistedState.getLastAcceptedState(), nullValue()); @@ -779,9 +781,10 @@ public void testRemotePersistedStateNotCommitted() throws IOException { .build(); Mockito.when(remoteClusterStateService.getLatestClusterMetadataManifest(Mockito.any(), Mockito.any())) .thenReturn(Optional.of(manifest)); - Mockito.when(remoteClusterStateService.writeFullMetadata(Mockito.any(), Mockito.any())).thenReturn(manifest); + Mockito.when(remoteClusterStateService.writeFullMetadata(Mockito.any(), Mockito.any())).thenReturn(new RemoteUploadDetails(manifest, "path/to/manifest")); - Mockito.when(remoteClusterStateService.writeIncrementalMetadata(Mockito.any(), Mockito.any(), Mockito.any())).thenReturn(manifest); + Mockito.when(remoteClusterStateService.writeIncrementalMetadata(Mockito.any(), Mockito.any(), Mockito.any())) + .thenReturn(new RemoteUploadDetails(manifest, "path/to/manifest")); CoordinationState.PersistedState remotePersistedState = new RemotePersistedState( remoteClusterStateService, ClusterState.UNKNOWN_UUID diff --git a/test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java b/test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java index 1c2270bab1260..b432e5411404e 100644 --- a/test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java +++ b/test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java @@ -1184,7 +1184,8 @@ protected Optional getDisruptableMockTransport(Transpo nodeHealthService, persistedStateRegistry, remoteStoreNodeService, - new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE) + new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE), + null ); clusterManagerService.setClusterStatePublisher(coordinator); final GatewayService gatewayService = new GatewayService(