Skip to content

Commit

Permalink
Send manifest file name in remote publish
Browse files Browse the repository at this point in the history
Signed-off-by: Sooraj Sinha <soosinha@amazon.com>
  • Loading branch information
soosinha committed Jun 7, 2024
1 parent 3f677bd commit f9b0d4d
Show file tree
Hide file tree
Showing 18 changed files with 166 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.opensearch.node.remotestore.RemoteStoreNodeAttribute;

import static org.opensearch.cluster.coordination.Coordinator.ZEN1_BWC_TERM;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled;
Expand Down Expand Up @@ -79,6 +80,7 @@ public class CoordinationState {
private long lastPublishedVersion;
private VotingConfiguration lastPublishedConfiguration;
private VoteCollection publishVotes;
private final boolean isRemoteStateEnabled;
private final boolean isRemotePublicationEnabled;

public CoordinationState(
Expand All @@ -102,7 +104,8 @@ public CoordinationState(
.getLastAcceptedState()
.getLastAcceptedConfiguration();
this.publishVotes = new VoteCollection();
this.isRemotePublicationEnabled = isRemoteStoreClusterStateEnabled(settings) && isRemoteRoutingTableEnabled(settings);
this.isRemoteStateEnabled = isRemoteStoreClusterStateEnabled(settings);
this.isRemotePublicationEnabled = RemoteStoreNodeAttribute.isRemotePublicationEnabled(settings);
}

public boolean isRemotePublicationEnabled() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.opensearch.cluster.coordination.CoordinationState.VoteCollection;
import org.opensearch.cluster.coordination.FollowersChecker.FollowerCheckRequest;
import org.opensearch.cluster.coordination.JoinHelper.InitialJoinAccumulator;
import org.opensearch.cluster.coordination.PersistedStateRegistry.PersistedStateType;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
Expand Down Expand Up @@ -1334,7 +1335,8 @@ assert getLocalNode().equals(clusterState.getNodes().get(getLocalNode().getId())

final PublicationTransportHandler.PublicationContext publicationContext = publicationHandler.newPublicationContext(
clusterChangedEvent,
coordinationState.get().isRemotePublicationEnabled()
coordinationState.get().isRemotePublicationEnabled(),
persistedStateRegistry
);

final PublishRequest publishRequest = coordinationState.get().handleClientValue(clusterState);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@

package org.opensearch.cluster.coordination;

import java.util.Locale;
import java.util.Optional;
import java.util.function.Supplier;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
Expand All @@ -40,13 +43,16 @@
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.Diff;
import org.opensearch.cluster.IncompatibleClusterStateVersionException;
import org.opensearch.cluster.coordination.CoordinationState.PersistedState;
import org.opensearch.cluster.coordination.PersistedStateRegistry.PersistedStateType;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.transport.TransportResponse;
import org.opensearch.gateway.GatewayMetaState.RemotePersistedState;
import org.opensearch.gateway.remote.ClusterMetadataManifest;
import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -229,50 +235,35 @@ private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportReque
}

private PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest request) throws IOException {
final Optional<ClusterMetadataManifest> manifestOptional = remoteClusterStateService.getClusterMetadataManifestByTermVersion(
request.getClusterName(),
request.getClusterUUID(),
request.term,
request.version
);
if (manifestOptional.isPresent() == false) {
throw new IllegalStateException(
String.format(Locale.ROOT, "Manifest is not present for term - %s version - %s", request.term, request.version)
);
if (transportService.getLocalNode().equals(request.getSourceNode())) {
return acceptStateOnLocalNode(request);
}
ClusterMetadataManifest manifest = manifestOptional.get();
ClusterMetadataManifest manifest = remoteClusterStateService.getClusterMetadataManifestByFileName(request.getClusterUUID(), request.getManifestFile());
boolean applyFullState = false;
final ClusterState lastSeen = lastSeenClusterState.get();
if (lastSeen == null) {
logger.debug("Diff cannot be applied as there is no last cluster state");
logger.debug(() -> "Diff cannot be applied as there is no last cluster state");
applyFullState = true;
} else if (manifest.getDiffManifest() == null) {
logger.debug("There is no diff in the manifest");
logger.trace(() -> "There is no diff in the manifest");
applyFullState = true;
} else if (manifest.getDiffManifest().getFromStateUUID().equals(lastSeen.stateUUID()) == false) {
logger.debug("Last cluster state not compatible with the diff");
logger.debug(() -> "Last cluster state not compatible with the diff");
applyFullState = true;
}

if (applyFullState == true) {
ClusterState clusterState = remoteClusterStateService.getClusterStateForManifest(
request.getClusterName(),
manifest,
transportService.getLocalNode().getId()
);
logger.debug("Downloaded full cluster state [{}]", clusterState);
logger.debug(() -> new ParameterizedMessage("Downloading full cluster state for term {}, version {}, stateUUID {}", manifest.getClusterTerm(), manifest.getStateVersion(),
manifest.getStateUUID()));
ClusterState clusterState = remoteClusterStateService.getClusterStateForManifest(request.getClusterName(), manifest, transportService.getLocalNode().getId(), true);
fullClusterStateReceivedCount.incrementAndGet();
final PublishWithJoinResponse response = acceptState(clusterState);
lastSeenClusterState.set(clusterState);
return response;
} else {
ClusterState clusterState = remoteClusterStateService.getClusterStateUsingDiff(
request.getClusterName(),
manifest,
lastSeenClusterState.get(),
transportService.getLocalNode().getId()
);
logger.debug("Downloaded full cluster state from diff [{}]", clusterState);
logger.debug(() -> new ParameterizedMessage("Downloading diff cluster state for term {}, version {}, previousUUID {}, current UUID {}", manifest.getClusterTerm(),
manifest.getStateVersion(), manifest.getDiffManifest().getFromStateUUID(), manifest.getStateUUID()));
ClusterState clusterState = remoteClusterStateService.getClusterStateUsingDiff(request.getClusterName(), manifest, lastSeen, transportService.getLocalNode().getId());
compatibleClusterStateDiffReceivedCount.incrementAndGet();
final PublishWithJoinResponse response = acceptState(clusterState);
lastSeenClusterState.compareAndSet(lastSeen, clusterState);
Expand All @@ -293,8 +284,20 @@ private PublishWithJoinResponse acceptState(ClusterState incomingState) {
return handlePublishRequest.apply(new PublishRequest(incomingState));
}

public PublicationContext newPublicationContext(ClusterChangedEvent clusterChangedEvent, boolean isRemotePublicationEnabled) {
final PublicationContext publicationContext = new PublicationContext(clusterChangedEvent, isRemotePublicationEnabled);
private PublishWithJoinResponse acceptStateOnLocalNode(RemotePublishRequest remotePublishRequest) {
final PublishRequest publishRequest = currentPublishRequestToSelf.get();
if (publishRequest == null || publishRequest.getAcceptedState().coordinationMetadata().term() != remotePublishRequest.term
|| publishRequest.getAcceptedState().version() != remotePublishRequest.version) {
throw new IllegalStateException("publication to self failed for " + remotePublishRequest);
}
PublishWithJoinResponse publishWithJoinResponse = handlePublishRequest.apply(publishRequest);
lastSeenClusterState.set(publishRequest.getAcceptedState());
return publishWithJoinResponse;
}

public PublicationContext newPublicationContext(ClusterChangedEvent clusterChangedEvent, boolean isRemotePublicationEnabled,
PersistedStateRegistry persistedStateRegistry) {
final PublicationContext publicationContext = new PublicationContext(clusterChangedEvent, isRemotePublicationEnabled, persistedStateRegistry);

// Build the serializations we expect to need now, early in the process, so that an error during serialization fails the publication
// straight away. This isn't watertight since we send diffs on a best-effort basis and may fall back to sending a full state (and
Expand Down Expand Up @@ -340,13 +343,15 @@ public class PublicationContext {
private final Map<Version, BytesReference> serializedStates = new HashMap<>();
private final Map<Version, BytesReference> serializedDiffs = new HashMap<>();
private final boolean sendRemoteState;
private final PersistedStateRegistry persistedStateRegistry;

PublicationContext(ClusterChangedEvent clusterChangedEvent, boolean isRemotePublicationEnabled) {
PublicationContext(ClusterChangedEvent clusterChangedEvent, boolean isRemotePublicationEnabled, PersistedStateRegistry persistedStateRegistry) {
discoveryNodes = clusterChangedEvent.state().nodes();
newState = clusterChangedEvent.state();
previousState = clusterChangedEvent.previousState();
sendFullVersion = previousState.getBlocks().disableStatePersistence();
sendRemoteState = isRemotePublicationEnabled;
this.persistedStateRegistry = persistedStateRegistry;
}

void buildDiffAndSerializeStates() {
Expand Down Expand Up @@ -410,7 +415,7 @@ public void onFailure(Exception e) {
} else {
responseActionListener = listener;
}
if (sendRemoteState && destination.isRemoteStateNode()) {
if (sendRemoteState && destination.isRemoteClusterStateEnabled() && destination.isRemoteRoutingTableEnabled()) {
sendRemoteClusterState(destination, publishRequest.getAcceptedState(), responseActionListener);
} else if (sendFullVersion || previousState.nodes().nodeExists(destination) == false) {
logger.trace("sending full cluster state version [{}] to [{}]", newState.version(), destination);
Expand Down Expand Up @@ -457,25 +462,16 @@ public String executor() {
);
}

private void sendRemoteClusterState(
DiscoveryNode destination,
ClusterState clusterState,
ActionListener<PublishWithJoinResponse> listener
) {
private void sendRemoteClusterState(final DiscoveryNode destination, final ClusterState clusterState, final ActionListener<PublishWithJoinResponse> listener) {
try {
final RemotePublishRequest remotePublishRequest = new RemotePublishRequest(
discoveryNodes.getLocalNode(),
clusterState.term(),
clusterState.getVersion(),
clusterState.getClusterName().value(),
clusterState.metadata().clusterUUID()
);
final String manifestFileName = ((RemotePersistedState) persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE)).getLastUploadedManifestFile();
final RemotePublishRequest remotePublishRequest = new RemotePublishRequest(discoveryNodes.getLocalNode(), clusterState.term(),
clusterState.getVersion(), clusterState.getClusterName().value(), clusterState.metadata().clusterUUID(), manifestFileName);
final Consumer<TransportException> transportExceptionHandler = exp -> {
logger.debug(() -> new ParameterizedMessage("failed to send remote cluster state to {}", destination), exp);
listener.onFailure(exp);
};
final TransportResponseHandler<PublishWithJoinResponse> responseHandler = new TransportResponseHandler<
PublishWithJoinResponse>() {
final TransportResponseHandler<PublishWithJoinResponse> responseHandler = new TransportResponseHandler<>() {

@Override
public PublishWithJoinResponse read(StreamInput in) throws IOException {
Expand All @@ -497,13 +493,7 @@ public String executor() {
return ThreadPool.Names.GENERIC;
}
};
transportService.sendRequest(
destination,
PUBLISH_REMOTE_STATE_ACTION_NAME,
remotePublishRequest,
stateRequestOptions,
responseHandler
);
transportService.sendRequest(destination, PUBLISH_REMOTE_STATE_ACTION_NAME, remotePublishRequest, stateRequestOptions, responseHandler);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("error sending remote cluster state to {}", destination), e);
listener.onFailure(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,36 @@

public class RemotePublishRequest extends TermVersionRequest {

// todo Do we need cluster name and UUID ?
private final String clusterName;
private final String clusterUUID;
private final String manifestFile;

public RemotePublishRequest(DiscoveryNode sourceNode, long term, long version, String clusterName, String clusterUUID) {
public RemotePublishRequest(DiscoveryNode sourceNode, long term, long version, String clusterName, String clusterUUID, String manifestFile) {
super(sourceNode, term, version);
this.clusterName = clusterName;
this.clusterUUID = clusterUUID;
this.manifestFile = manifestFile;
}

public RemotePublishRequest(StreamInput in) throws IOException {
super(in);
this.clusterName = in.readString();
this.clusterUUID = in.readString();
this.manifestFile = in.readString();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(clusterName);
out.writeString(clusterUUID);
out.writeString(manifestFile);
}

@Override
public String toString() {
return "RemotePublishRequest{" + "term=" + term + ", version=" + version + ", clusterName=" + clusterName + ", clusterUUID=" + clusterUUID
+ ", sourceNode=" + sourceNode + ", manifestFile=" + manifestFile + '}';
}

public String getClusterName() {
Expand All @@ -46,4 +55,8 @@ public String getClusterName() {
public String getClusterUUID() {
return clusterUUID;
}

public String getManifestFile() {
return manifestFile;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
package org.opensearch.cluster.node;

import org.opensearch.Version;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.common.UUIDs;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.settings.Setting;
Expand All @@ -43,6 +44,7 @@
import org.opensearch.core.common.transport.TransportAddress;
import org.opensearch.core.xcontent.ToXContentFragment;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.node.Node;

import java.io.IOException;
Expand Down Expand Up @@ -474,20 +476,20 @@ public boolean isRemoteStoreNode() {

/**
* Returns whether the node is a remote cluster state enabled node.
* @return true if the node contains remote cluster state and remote routing table node attributes, false otherwise
* @return true if the node contains remote cluster state node attribute, false otherwise
*/
public boolean isRemoteStateNode() {
return isRemoteClusterStateEnabled() && isRemoteRoutingTableEnabled();
}

private boolean isRemoteClusterStateEnabled() {
public boolean isRemoteClusterStateEnabled() {
return this.getAttributes()
.keySet()
.stream()
.anyMatch(key -> (key.equals(REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY)));
}

private boolean isRemoteRoutingTableEnabled() {
/**
* Returns whether remote routing table is enabled on the node
* @return true if the node contains remote routing table node attributes, false otherwise
*/
public boolean isRemoteRoutingTableEnabled() {
return this.getAttributes().keySet().stream().anyMatch(key -> key.equals(REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import org.opensearch.cluster.routing.allocation.decider.SameShardAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider;
import org.opensearch.cluster.routing.remote.RemoteRoutingTableService;
import org.opensearch.cluster.service.ClusterApplierService;
import org.opensearch.cluster.service.ClusterManagerService;
import org.opensearch.cluster.service.ClusterManagerTaskThrottler;
Expand Down Expand Up @@ -715,9 +716,6 @@ public void apply(Settings value, Settings current, Settings previous) {
// Remote cluster state settings
RemoteClusterStateCleanupManager.REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING,
RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING,
RemoteClusterStateService.INDEX_METADATA_UPLOAD_TIMEOUT_SETTING,
RemoteClusterStateService.GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING,
RemoteClusterStateService.METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING,
RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING,
RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING,
IndicesService.CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.common.transport.TransportAddress;
import org.opensearch.gateway.GatewayMetaState;
import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.monitor.NodeHealthService;
import org.opensearch.node.remotestore.RemoteStoreNodeService;
import org.opensearch.plugins.DiscoveryPlugin;
Expand Down Expand Up @@ -135,7 +136,8 @@ public DiscoveryModule(
NodeHealthService nodeHealthService,
PersistedStateRegistry persistedStateRegistry,
RemoteStoreNodeService remoteStoreNodeService,
ClusterManagerMetrics clusterManagerMetrics
ClusterManagerMetrics clusterManagerMetrics,
RemoteClusterStateService remoteClusterStateService
) {
final Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators = new ArrayList<>();
final Map<String, Supplier<SeedHostsProvider>> hostProviders = new HashMap<>();
Expand Down Expand Up @@ -214,7 +216,8 @@ public DiscoveryModule(
nodeHealthService,
persistedStateRegistry,
remoteStoreNodeService,
clusterManagerMetrics
clusterManagerMetrics,
remoteClusterStateService
);
} else {
throw new IllegalArgumentException("Unknown discovery type [" + discoveryType + "]");
Expand Down
Loading

0 comments on commit f9b0d4d

Please sign in to comment.