Skip to content

Commit

Permalink
Add remote state publication transport call (opensearch-project#13835)
Browse files Browse the repository at this point in the history
* Add remote state publication transport call

Signed-off-by: Sooraj Sinha <soosinha@amazon.com>

* Add publication flag and remote routing table check

Signed-off-by: Himshikha Gupta <himshikh@amazon.com>
  • Loading branch information
soosinha authored and parv0201 committed Jun 10, 2024
1 parent a74cc47 commit dbe63a0
Show file tree
Hide file tree
Showing 20 changed files with 707 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.common.util.io.IOUtils;

import java.io.Closeable;
Expand All @@ -52,6 +53,7 @@
import java.util.Set;

import static org.opensearch.cluster.coordination.Coordinator.ZEN1_BWC_TERM;
import static org.opensearch.common.util.FeatureFlags.REMOTE_PUBLICATION_EXPERIMENTAL;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled;

/**
Expand Down Expand Up @@ -79,6 +81,7 @@ public class CoordinationState {
private VotingConfiguration lastPublishedConfiguration;
private VoteCollection publishVotes;
private final boolean isRemoteStateEnabled;
private final boolean isRemotePublicationEnabled;

public CoordinationState(
DiscoveryNode localNode,
Expand All @@ -102,6 +105,12 @@ public CoordinationState(
.getLastAcceptedConfiguration();
this.publishVotes = new VoteCollection();
this.isRemoteStateEnabled = isRemoteStoreClusterStateEnabled(settings);
this.isRemotePublicationEnabled = FeatureFlags.isEnabled(REMOTE_PUBLICATION_EXPERIMENTAL)
&& localNode.isRemoteStatePublicationEnabled();
}

public boolean isRemotePublicationEnabled() {
return isRemotePublicationEnabled;
}

public long getCurrentTerm() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
import org.opensearch.discovery.PeerFinder;
import org.opensearch.discovery.SeedHostsProvider;
import org.opensearch.discovery.SeedHostsResolver;
import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.monitor.NodeHealthService;
import org.opensearch.monitor.StatusInfo;
import org.opensearch.node.remotestore.RemoteStoreNodeService;
Expand Down Expand Up @@ -209,7 +210,8 @@ public Coordinator(
NodeHealthService nodeHealthService,
PersistedStateRegistry persistedStateRegistry,
RemoteStoreNodeService remoteStoreNodeService,
ClusterManagerMetrics clusterManagerMetrics
ClusterManagerMetrics clusterManagerMetrics,
RemoteClusterStateService remoteClusterStateService
) {
this.settings = settings;
this.transportService = transportService;
Expand Down Expand Up @@ -261,7 +263,8 @@ public Coordinator(
transportService,
namedWriteableRegistry,
this::handlePublishRequest,
this::handleApplyCommit
this::handleApplyCommit,
remoteClusterStateService
);
this.leaderChecker = new LeaderChecker(
settings,
Expand Down Expand Up @@ -1330,7 +1333,9 @@ assert getLocalNode().equals(clusterState.getNodes().get(getLocalNode().getId())
+ clusterState;

final PublicationTransportHandler.PublicationContext publicationContext = publicationHandler.newPublicationContext(
clusterChangedEvent
clusterChangedEvent,
coordinationState.get().isRemotePublicationEnabled(),
persistedStateRegistry
);

final PublishRequest publishRequest = coordinationState.get().handleClientValue(clusterState);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,17 @@
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.Diff;
import org.opensearch.cluster.IncompatibleClusterStateVersionException;
import org.opensearch.cluster.coordination.PersistedStateRegistry.PersistedStateType;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.transport.TransportResponse;
import org.opensearch.gateway.GatewayMetaState.RemotePersistedState;
import org.opensearch.gateway.remote.ClusterMetadataManifest;
import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.BytesTransportRequest;
import org.opensearch.transport.TransportChannel;
Expand Down Expand Up @@ -74,6 +78,7 @@ public class PublicationTransportHandler {
private static final Logger logger = LogManager.getLogger(PublicationTransportHandler.class);

public static final String PUBLISH_STATE_ACTION_NAME = "internal:cluster/coordination/publish_state";
public static final String PUBLISH_REMOTE_STATE_ACTION_NAME = "internal:cluster/coordination/publish_remote_state";
public static final String COMMIT_STATE_ACTION_NAME = "internal:cluster/coordination/commit_state";

private final TransportService transportService;
Expand All @@ -97,16 +102,19 @@ public class PublicationTransportHandler {
private final TransportRequestOptions stateRequestOptions = TransportRequestOptions.builder()
.withType(TransportRequestOptions.Type.STATE)
.build();
private final RemoteClusterStateService remoteClusterStateService;

public PublicationTransportHandler(
TransportService transportService,
NamedWriteableRegistry namedWriteableRegistry,
Function<PublishRequest, PublishWithJoinResponse> handlePublishRequest,
BiConsumer<ApplyCommitRequest, ActionListener<Void>> handleApplyCommit
BiConsumer<ApplyCommitRequest, ActionListener<Void>> handleApplyCommit,
RemoteClusterStateService remoteClusterStateService
) {
this.transportService = transportService;
this.namedWriteableRegistry = namedWriteableRegistry;
this.handlePublishRequest = handlePublishRequest;
this.remoteClusterStateService = remoteClusterStateService;

transportService.registerRequestHandler(
PUBLISH_STATE_ACTION_NAME,
Expand All @@ -117,6 +125,15 @@ public PublicationTransportHandler(
(request, channel, task) -> channel.sendResponse(handleIncomingPublishRequest(request))
);

transportService.registerRequestHandler(
PUBLISH_REMOTE_STATE_ACTION_NAME,
ThreadPool.Names.GENERIC,
false,
false,
RemotePublishRequest::new,
(request, channel, task) -> channel.sendResponse(handleIncomingRemotePublishRequest(request))
);

transportService.registerRequestHandler(
COMMIT_STATE_ACTION_NAME,
ThreadPool.Names.GENERIC,
Expand Down Expand Up @@ -211,6 +228,74 @@ private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportReque
}
}

// package private for testing
PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest request) throws IOException {
if (transportService.getLocalNode().equals(request.getSourceNode())) {
return acceptRemoteStateOnLocalNode(request);
}
// TODO Make cluster state download non-blocking: https://github.com/opensearch-project/OpenSearch/issues/14102
ClusterMetadataManifest manifest = remoteClusterStateService.getClusterMetadataManifestByFileName(
request.getClusterUUID(),
request.getManifestFile()
);
if (manifest == null) {
throw new IllegalStateException("Publication failed as manifest was not found for " + request);
}
boolean applyFullState = false;
final ClusterState lastSeen = lastSeenClusterState.get();
if (lastSeen == null) {
logger.debug(() -> "Diff cannot be applied as there is no last cluster state");
applyFullState = true;
} else if (manifest.getDiffManifest() == null) {
logger.trace(() -> "There is no diff in the manifest");
applyFullState = true;
} else if (manifest.getDiffManifest().getFromStateUUID().equals(lastSeen.stateUUID()) == false) {
logger.debug(() -> "Last cluster state not compatible with the diff");
applyFullState = true;
}

if (applyFullState == true) {
logger.debug(
() -> new ParameterizedMessage(
"Downloading full cluster state for term {}, version {}, stateUUID {}",
manifest.getClusterTerm(),
manifest.getStateVersion(),
manifest.getStateUUID()
)
);
ClusterState clusterState = remoteClusterStateService.getClusterStateForManifest(
request.getClusterName(),
manifest,
transportService.getLocalNode().getId(),
true
);
fullClusterStateReceivedCount.incrementAndGet();
final PublishWithJoinResponse response = acceptState(clusterState);
lastSeenClusterState.set(clusterState);
return response;
} else {
logger.debug(
() -> new ParameterizedMessage(
"Downloading diff cluster state for term {}, version {}, previousUUID {}, current UUID {}",
manifest.getClusterTerm(),
manifest.getStateVersion(),
manifest.getDiffManifest().getFromStateUUID(),
manifest.getStateUUID()
)
);
ClusterState clusterState = remoteClusterStateService.getClusterStateUsingDiff(
request.getClusterName(),
manifest,
lastSeen,
transportService.getLocalNode().getId()
);
compatibleClusterStateDiffReceivedCount.incrementAndGet();
final PublishWithJoinResponse response = acceptState(clusterState);
lastSeenClusterState.compareAndSet(lastSeen, clusterState);
return response;
}
}

private PublishWithJoinResponse acceptState(ClusterState incomingState) {
// if the state is coming from the current node, use original request instead (see currentPublishRequestToSelf for explanation)
if (transportService.getLocalNode().equals(incomingState.nodes().getClusterManagerNode())) {
Expand All @@ -224,8 +309,35 @@ private PublishWithJoinResponse acceptState(ClusterState incomingState) {
return handlePublishRequest.apply(new PublishRequest(incomingState));
}

public PublicationContext newPublicationContext(ClusterChangedEvent clusterChangedEvent) {
final PublicationContext publicationContext = new PublicationContext(clusterChangedEvent);
private PublishWithJoinResponse acceptRemoteStateOnLocalNode(RemotePublishRequest remotePublishRequest) {
final PublishRequest publishRequest = currentPublishRequestToSelf.get();
if (publishRequest == null
|| publishRequest.getAcceptedState().coordinationMetadata().term() != remotePublishRequest.term
|| publishRequest.getAcceptedState().version() != remotePublishRequest.version) {
logger.debug(
() -> new ParameterizedMessage(
"Publication failure for current publish request : {} and remote publish request: {}",
publishRequest,
remotePublishRequest
)
);
throw new IllegalStateException("publication to self failed for " + remotePublishRequest);
}
PublishWithJoinResponse publishWithJoinResponse = handlePublishRequest.apply(publishRequest);
lastSeenClusterState.set(publishRequest.getAcceptedState());
return publishWithJoinResponse;
}

public PublicationContext newPublicationContext(
ClusterChangedEvent clusterChangedEvent,
boolean isRemotePublicationEnabled,
PersistedStateRegistry persistedStateRegistry
) {
final PublicationContext publicationContext = new PublicationContext(
clusterChangedEvent,
isRemotePublicationEnabled,
persistedStateRegistry
);

// Build the serializations we expect to need now, early in the process, so that an error during serialization fails the publication
// straight away. This isn't watertight since we send diffs on a best-effort basis and may fall back to sending a full state (and
Expand All @@ -234,6 +346,16 @@ public PublicationContext newPublicationContext(ClusterChangedEvent clusterChang
return publicationContext;
}

// package private for testing
void setCurrentPublishRequestToSelf(PublishRequest publishRequest) {
this.currentPublishRequestToSelf.set(publishRequest);
}

// package private for testing
void setLastSeenClusterState(ClusterState clusterState) {
this.lastSeenClusterState.set(clusterState);
}

private static BytesReference serializeFullClusterState(ClusterState clusterState, Version nodeVersion) throws IOException {
final BytesReference serializedState = CompressedStreamUtils.createCompressedStream(nodeVersion, stream -> {
stream.writeBoolean(true);
Expand Down Expand Up @@ -270,12 +392,20 @@ public class PublicationContext {
private final boolean sendFullVersion;
private final Map<Version, BytesReference> serializedStates = new HashMap<>();
private final Map<Version, BytesReference> serializedDiffs = new HashMap<>();
private final boolean sendRemoteState;
private final PersistedStateRegistry persistedStateRegistry;

PublicationContext(ClusterChangedEvent clusterChangedEvent) {
PublicationContext(
ClusterChangedEvent clusterChangedEvent,
boolean isRemotePublicationEnabled,
PersistedStateRegistry persistedStateRegistry
) {
discoveryNodes = clusterChangedEvent.state().nodes();
newState = clusterChangedEvent.state();
previousState = clusterChangedEvent.previousState();
sendFullVersion = previousState.getBlocks().disableStatePersistence();
sendRemoteState = isRemotePublicationEnabled;
this.persistedStateRegistry = persistedStateRegistry;
}

void buildDiffAndSerializeStates() {
Expand Down Expand Up @@ -339,7 +469,11 @@ public void onFailure(Exception e) {
} else {
responseActionListener = listener;
}
if (sendFullVersion || previousState.nodes().nodeExists(destination) == false) {
// TODO Decide to send remote state before starting publication by checking remote publication on all nodes
if (sendRemoteState && destination.isRemoteStatePublicationEnabled()) {
logger.trace("sending remote cluster state version [{}] to [{}]", newState.version(), destination);
sendRemoteClusterState(destination, publishRequest.getAcceptedState(), responseActionListener);
} else if (sendFullVersion || previousState.nodes().nodeExists(destination) == false) {
logger.trace("sending full cluster state version [{}] to [{}]", newState.version(), destination);
sendFullClusterState(destination, responseActionListener);
} else {
Expand Down Expand Up @@ -384,6 +518,61 @@ public String executor() {
);
}

private void sendRemoteClusterState(
final DiscoveryNode destination,
final ClusterState clusterState,
final ActionListener<PublishWithJoinResponse> listener
) {
try {
final String manifestFileName = ((RemotePersistedState) persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE))
.getLastUploadedManifestFile();
final RemotePublishRequest remotePublishRequest = new RemotePublishRequest(
discoveryNodes.getLocalNode(),
clusterState.term(),
clusterState.getVersion(),
clusterState.getClusterName().value(),
clusterState.metadata().clusterUUID(),
manifestFileName
);
final Consumer<TransportException> transportExceptionHandler = exp -> {
logger.debug(() -> new ParameterizedMessage("failed to send remote cluster state to {}", destination), exp);
listener.onFailure(exp);
};
final TransportResponseHandler<PublishWithJoinResponse> responseHandler = new TransportResponseHandler<>() {

@Override
public PublishWithJoinResponse read(StreamInput in) throws IOException {
return new PublishWithJoinResponse(in);
}

@Override
public void handleResponse(PublishWithJoinResponse response) {
listener.onResponse(response);
}

@Override
public void handleException(TransportException exp) {
transportExceptionHandler.accept(exp);
}

@Override
public String executor() {
return ThreadPool.Names.GENERIC;
}
};
transportService.sendRequest(
destination,
PUBLISH_REMOTE_STATE_ACTION_NAME,
remotePublishRequest,
stateRequestOptions,
responseHandler
);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("error sending remote cluster state to {}", destination), e);
listener.onFailure(e);
}
}

private void sendFullClusterState(DiscoveryNode destination, ActionListener<PublishWithJoinResponse> listener) {
BytesReference bytes = serializedStates.get(destination.getVersion());
if (bytes == null) {
Expand Down
Loading

0 comments on commit dbe63a0

Please sign in to comment.