Skip to content

Commit

Permalink
Add publication flag and remote routing table check
Browse files Browse the repository at this point in the history
Signed-off-by: Himshikha Gupta <himshikh@amazon.com>
  • Loading branch information
Himshikha Gupta authored and soosinha committed Jun 7, 2024
1 parent 6c93d98 commit 3f677bd
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import java.util.Set;

import static org.opensearch.cluster.coordination.Coordinator.ZEN1_BWC_TERM;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled;

/**
Expand All @@ -78,7 +79,7 @@ public class CoordinationState {
private long lastPublishedVersion;
private VotingConfiguration lastPublishedConfiguration;
private VoteCollection publishVotes;
private final boolean isRemoteStateEnabled;
private final boolean isRemotePublicationEnabled;

public CoordinationState(
DiscoveryNode localNode,
Expand All @@ -101,11 +102,11 @@ public CoordinationState(
.getLastAcceptedState()
.getLastAcceptedConfiguration();
this.publishVotes = new VoteCollection();
this.isRemoteStateEnabled = isRemoteStoreClusterStateEnabled(settings);
this.isRemotePublicationEnabled = isRemoteStoreClusterStateEnabled(settings) && isRemoteRoutingTableEnabled(settings);
}

public boolean isRemoteStateEnabled() {
return isRemoteStateEnabled;
public boolean isRemotePublicationEnabled() {
return isRemotePublicationEnabled;
}

public long getCurrentTerm() {
Expand Down Expand Up @@ -579,7 +580,7 @@ public void handlePrePublish(ClusterState clusterState) {
// This is to ensure the remote store is the single source of truth for current state. Even if the current node
// goes down after sending the cluster state to other nodes, we should be able to read the remote state and
// recover the cluster.
if (isRemoteStateEnabled) {
if (isRemotePublicationEnabled) {
assert persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE) != null : "Remote state has not been initialized";
persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).setLastAcceptedState(clusterState);
}
Expand All @@ -590,7 +591,7 @@ public void handlePrePublish(ClusterState clusterState) {
*/
public void handlePreCommit() {
// Publishing the committed state to remote store before sending apply commit to other nodes.
if (isRemoteStateEnabled) {
if (isRemotePublicationEnabled) {
assert persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE) != null : "Remote state has not been initialized";
persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).markLastAcceptedStateAsCommitted();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1333,7 +1333,8 @@ assert getLocalNode().equals(clusterState.getNodes().get(getLocalNode().getId())
+ clusterState;

final PublicationTransportHandler.PublicationContext publicationContext = publicationHandler.newPublicationContext(
clusterChangedEvent, coordinationState.get().isRemoteStateEnabled()
clusterChangedEvent,
coordinationState.get().isRemotePublicationEnabled()
);

final PublishRequest publishRequest = coordinationState.get().handleClientValue(clusterState);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@

package org.opensearch.cluster.coordination;

import java.util.Locale;
import java.util.Optional;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
Expand Down Expand Up @@ -61,7 +59,9 @@

import java.io.IOException;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
Expand Down Expand Up @@ -229,7 +229,12 @@ private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportReque
}

private PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest request) throws IOException {
final Optional<ClusterMetadataManifest> manifestOptional = remoteClusterStateService.getClusterMetadataManifestByTermVersion(request.getClusterName(), request.getClusterUUID(), request.term, request.version);
final Optional<ClusterMetadataManifest> manifestOptional = remoteClusterStateService.getClusterMetadataManifestByTermVersion(
request.getClusterName(),
request.getClusterUUID(),
request.term,
request.version
);
if (manifestOptional.isPresent() == false) {
throw new IllegalStateException(
String.format(Locale.ROOT, "Manifest is not present for term - %s version - %s", request.term, request.version)
Expand All @@ -250,14 +255,23 @@ private PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublish
}

if (applyFullState == true) {
ClusterState clusterState = remoteClusterStateService.getClusterStateForManifest(request.getClusterName(), manifest, transportService.getLocalNode().getId());
ClusterState clusterState = remoteClusterStateService.getClusterStateForManifest(
request.getClusterName(),
manifest,
transportService.getLocalNode().getId()
);
logger.debug("Downloaded full cluster state [{}]", clusterState);
fullClusterStateReceivedCount.incrementAndGet();
final PublishWithJoinResponse response = acceptState(clusterState);
lastSeenClusterState.set(clusterState);
return response;
} else {
ClusterState clusterState = remoteClusterStateService.getClusterStateUsingDiff(request.getClusterName(), manifest, lastSeenClusterState.get(), transportService.getLocalNode().getId());
ClusterState clusterState = remoteClusterStateService.getClusterStateUsingDiff(
request.getClusterName(),
manifest,
lastSeenClusterState.get(),
transportService.getLocalNode().getId()
);
logger.debug("Downloaded full cluster state from diff [{}]", clusterState);
compatibleClusterStateDiffReceivedCount.incrementAndGet();
final PublishWithJoinResponse response = acceptState(clusterState);
Expand All @@ -279,8 +293,8 @@ private PublishWithJoinResponse acceptState(ClusterState incomingState) {
return handlePublishRequest.apply(new PublishRequest(incomingState));
}

public PublicationContext newPublicationContext(ClusterChangedEvent clusterChangedEvent, boolean isRemoteStateEnabled) {
final PublicationContext publicationContext = new PublicationContext(clusterChangedEvent, isRemoteStateEnabled);
public PublicationContext newPublicationContext(ClusterChangedEvent clusterChangedEvent, boolean isRemotePublicationEnabled) {
final PublicationContext publicationContext = new PublicationContext(clusterChangedEvent, isRemotePublicationEnabled);

// Build the serializations we expect to need now, early in the process, so that an error during serialization fails the publication
// straight away. This isn't watertight since we send diffs on a best-effort basis and may fall back to sending a full state (and
Expand Down Expand Up @@ -327,12 +341,12 @@ public class PublicationContext {
private final Map<Version, BytesReference> serializedDiffs = new HashMap<>();
private final boolean sendRemoteState;

PublicationContext(ClusterChangedEvent clusterChangedEvent, boolean isRemoteStateEnabled) {
PublicationContext(ClusterChangedEvent clusterChangedEvent, boolean isRemotePublicationEnabled) {
discoveryNodes = clusterChangedEvent.state().nodes();
newState = clusterChangedEvent.state();
previousState = clusterChangedEvent.previousState();
sendFullVersion = previousState.getBlocks().disableStatePersistence();
sendRemoteState = isRemoteStateEnabled;
sendRemoteState = isRemotePublicationEnabled;
}

void buildDiffAndSerializeStates() {
Expand Down Expand Up @@ -443,9 +457,19 @@ public String executor() {
);
}

private void sendRemoteClusterState(DiscoveryNode destination, ClusterState clusterState, ActionListener<PublishWithJoinResponse> listener) {
private void sendRemoteClusterState(
DiscoveryNode destination,
ClusterState clusterState,
ActionListener<PublishWithJoinResponse> listener
) {
try {
final RemotePublishRequest remotePublishRequest = new RemotePublishRequest(discoveryNodes.getLocalNode(), clusterState.term(), clusterState.getVersion(), clusterState.getClusterName().value(), clusterState.metadata().clusterUUID());
final RemotePublishRequest remotePublishRequest = new RemotePublishRequest(
discoveryNodes.getLocalNode(),
clusterState.term(),
clusterState.getVersion(),
clusterState.getClusterName().value(),
clusterState.metadata().clusterUUID()
);
final Consumer<TransportException> transportExceptionHandler = exp -> {
logger.debug(() -> new ParameterizedMessage("failed to send remote cluster state to {}", destination), exp);
listener.onFailure(exp);
Expand Down Expand Up @@ -473,7 +497,13 @@ public String executor() {
return ThreadPool.Names.GENERIC;
}
};
transportService.sendRequest(destination, PUBLISH_REMOTE_STATE_ACTION_NAME, remotePublishRequest, stateRequestOptions, responseHandler);
transportService.sendRequest(
destination,
PUBLISH_REMOTE_STATE_ACTION_NAME,
remotePublishRequest,
stateRequestOptions,
responseHandler
);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("error sending remote cluster state to {}", destination), e);
listener.onFailure(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@

package org.opensearch.cluster.coordination;

import java.io.IOException;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;

import java.io.IOException;

public class RemotePublishRequest extends TermVersionRequest {

// todo Do we need cluster name and UUID ?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@
import java.util.stream.Stream;

import static org.opensearch.node.NodeRoleSettings.NODE_ROLES_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_NODE_ATTRIBUTE_KEY_PREFIX;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY;

/**
* A discovery node represents a node that is part of the cluster.
Expand Down Expand Up @@ -470,6 +472,25 @@ public boolean isRemoteStoreNode() {
return this.getAttributes().keySet().stream().anyMatch(key -> key.startsWith(REMOTE_STORE_NODE_ATTRIBUTE_KEY_PREFIX));
}

/**
* Returns whether the node is a remote cluster state enabled node.
* @return true if the node contains remote cluster state and remote routing table node attributes, false otherwise
*/
public boolean isRemoteStateNode() {
return isRemoteClusterStateEnabled() && isRemoteRoutingTableEnabled();
}

private boolean isRemoteClusterStateEnabled() {
return this.getAttributes()
.keySet()
.stream()
.anyMatch(key -> (key.equals(REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY)));
}

private boolean isRemoteRoutingTableEnabled() {
return this.getAttributes().keySet().stream().anyMatch(key -> key.equals(REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY));
}

/**
* Returns a set of all the roles that the node has. The roles are returned in sorted order by the role name.
* <p>
Expand Down

0 comments on commit 3f677bd

Please sign in to comment.