Skip to content

Commit

Permalink
Added wait_for_metadata_version parameter to cluster state api. (#35535)
Browse files Browse the repository at this point in the history
The `wait_for_metadata_version` parameter will instruct the cluster state
api to only return a cluster state until the metadata's version is equal or
greater than the version specified in `wait_for_metadata_version`. If  
the specified `wait_for_timeout` has expired then a timed out response 
is returned. (a response with no cluster state and wait for timed out flag set to true)
In  the case metadata's version is equal or higher than  `wait_for_metadata_version`
then the api will immediately return.

This feature is useful to avoid external components from constantly
polling the cluster state to whether somethings have changed in the
cluster state's metadata.
  • Loading branch information
martijnvg authored Nov 26, 2018
1 parent 00e6fec commit 7624734
Show file tree
Hide file tree
Showing 14 changed files with 351 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ private static MockTransportService startTransport(
builder.add(node);
}
ClusterState build = ClusterState.builder(clusterName).nodes(builder.build()).build();
channel.sendResponse(new ClusterStateResponse(clusterName, build, 0L));
channel.sendResponse(new ClusterStateResponse(clusterName, build, 0L, false));
});
newService.start();
newService.acceptIncomingRequests();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@
"type": "boolean",
"description": "Return settings in flat format (default: false)"
},
"wait_for_metadata_version": {
"type": "number",
"description": "Wait for the metadata version to be equal or greater than the specified metadata version"
},
"wait_for_timeout" : {
"type": "time",
"description": "The maximum time to wait for wait_for_metadata_version before timing out"
},
"ignore_unavailable": {
"type" : "boolean",
"description" : "Whether specified concrete indices should be ignored when unavailable (missing or closed)"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,29 @@

package org.elasticsearch.action.admin.cluster.state;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.MasterNodeReadRequest;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;

import java.io.IOException;

public class ClusterStateRequest extends MasterNodeReadRequest<ClusterStateRequest> implements IndicesRequest.Replaceable {

public static final TimeValue DEFAULT_WAIT_FOR_NODE_TIMEOUT = TimeValue.timeValueMinutes(1);

private boolean routingTable = true;
private boolean nodes = true;
private boolean metaData = true;
private boolean blocks = true;
private boolean customs = true;
private Long waitForMetaDataVersion;
private TimeValue waitForTimeout = DEFAULT_WAIT_FOR_NODE_TIMEOUT;
private String[] indices = Strings.EMPTY_ARRAY;
private IndicesOptions indicesOptions = IndicesOptions.lenientExpandOpen();

Expand All @@ -51,6 +57,11 @@ public ClusterStateRequest(StreamInput in) throws IOException {
customs = in.readBoolean();
indices = in.readStringArray();
indicesOptions = IndicesOptions.readIndicesOptions(in);
// TODO: change version to V_6_6_0 after backporting:
if (in.getVersion().onOrAfter(Version.V_7_0_0)) {
waitForTimeout = in.readTimeValue();
waitForMetaDataVersion = in.readOptionalLong();
}
}

@Override
Expand All @@ -63,6 +74,11 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(customs);
out.writeStringArray(indices);
indicesOptions.writeIndicesOptions(out);
// TODO: change version to V_6_6_0 after backporting:
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
out.writeTimeValue(waitForTimeout);
out.writeOptionalLong(waitForMetaDataVersion);
}
}

@Override
Expand Down Expand Up @@ -156,6 +172,28 @@ public boolean customs() {
return customs;
}

public TimeValue waitForTimeout() {
return waitForTimeout;
}

public ClusterStateRequest waitForTimeout(TimeValue waitForTimeout) {
this.waitForTimeout = waitForTimeout;
return this;
}

public Long waitForMetaDataVersion() {
return waitForMetaDataVersion;
}

public ClusterStateRequest waitForMetaDataVersion(long waitForMetaDataVersion) {
if (waitForMetaDataVersion < 1) {
throw new IllegalArgumentException("provided waitForMetaDataVersion should be >= 1, but instead is [" +
waitForMetaDataVersion + "]");
}
this.waitForMetaDataVersion = waitForMetaDataVersion;
return this;
}

@Override
public void readFrom(StreamInput in) throws IOException {
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@
import org.elasticsearch.cluster.ClusterModule;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.ByteSizeValue;

import java.io.IOException;
import java.util.Objects;

/**
* The response for getting the cluster state.
Expand All @@ -40,14 +42,16 @@ public class ClusterStateResponse extends ActionResponse {
// the total compressed size of the full cluster state, not just
// the parts included in this response
private ByteSizeValue totalCompressedSize;
private boolean waitForTimedOut = false;

public ClusterStateResponse() {
}

public ClusterStateResponse(ClusterName clusterName, ClusterState clusterState, long sizeInBytes) {
public ClusterStateResponse(ClusterName clusterName, ClusterState clusterState, long sizeInBytes, boolean waitForTimedOut) {
this.clusterName = clusterName;
this.clusterState = clusterState;
this.totalCompressedSize = new ByteSizeValue(sizeInBytes);
this.waitForTimedOut = waitForTimedOut;
}

/**
Expand Down Expand Up @@ -75,11 +79,24 @@ public ByteSizeValue getTotalCompressedSize() {
return totalCompressedSize;
}

/**
* Returns whether the request timed out waiting for a cluster state with a metadata version equal or
* higher than the specified metadata.
*/
public boolean isWaitForTimedOut() {
return waitForTimedOut;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
clusterName = new ClusterName(in);
clusterState = ClusterState.readFrom(in, null);
// TODO: change version to V_6_6_0 after backporting:
if (in.getVersion().onOrAfter(Version.V_7_0_0)) {
clusterState = in.readOptionalWriteable(innerIn -> ClusterState.readFrom(innerIn, null));
} else {
clusterState = ClusterState.readFrom(in, null);
}
if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
totalCompressedSize = new ByteSizeValue(in);
} else {
Expand All @@ -89,19 +106,80 @@ public void readFrom(StreamInput in) throws IOException {
// at which point the correct cluster state size will always be reported
totalCompressedSize = new ByteSizeValue(0L);
}
// TODO: change version to V_6_6_0 after backporting:
if (in.getVersion().onOrAfter(Version.V_7_0_0)) {
waitForTimedOut = in.readBoolean();
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
clusterName.writeTo(out);
if (out.getVersion().onOrAfter(Version.V_6_3_0)) {
clusterState.writeTo(out);
// TODO: change version to V_6_6_0 after backporting:
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
out.writeOptionalWriteable(clusterState);
} else {
ClusterModule.filterCustomsForPre63Clients(clusterState).writeTo(out);
if (out.getVersion().onOrAfter(Version.V_6_3_0)) {
clusterState.writeTo(out);
} else {
ClusterModule.filterCustomsForPre63Clients(clusterState).writeTo(out);
}
}
if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
totalCompressedSize.writeTo(out);
}
// TODO: change version to V_6_6_0 after backporting:
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
out.writeBoolean(waitForTimedOut);
}
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ClusterStateResponse response = (ClusterStateResponse) o;
return waitForTimedOut == response.waitForTimedOut &&
Objects.equals(clusterName, response.clusterName) &&
// Best effort. Only compare cluster state version and master node id,
// because cluster state doesn't implement equals()
Objects.equals(getVersion(clusterState), getVersion(response.clusterState)) &&
Objects.equals(getMasterNodeId(clusterState), getMasterNodeId(response.clusterState)) &&
Objects.equals(totalCompressedSize, response.totalCompressedSize);
}

@Override
public int hashCode() {
// Best effort. Only use cluster state version and master node id,
// because cluster state doesn't implement hashcode()
return Objects.hash(
clusterName,
getVersion(clusterState),
getMasterNodeId(clusterState),
totalCompressedSize,
waitForTimedOut
);
}

private static String getMasterNodeId(ClusterState clusterState) {
if (clusterState == null) {
return null;
}
DiscoveryNodes nodes = clusterState.getNodes();
if (nodes != null) {
return nodes.getMasterNodeId();
} else {
return null;
}
}

private static Long getVersion(ClusterState clusterState) {
if (clusterState != null) {
return clusterState.getVersion();
} else {
return null;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeReadAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
Expand All @@ -33,10 +34,13 @@
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.function.Predicate;

import static org.elasticsearch.discovery.zen.PublishClusterStateAction.serializeFullClusterState;

Expand Down Expand Up @@ -74,7 +78,51 @@ protected ClusterStateResponse newResponse() {
@Override
protected void masterOperation(final ClusterStateRequest request, final ClusterState state,
final ActionListener<ClusterStateResponse> listener) throws IOException {
ClusterState currentState = clusterService.state();

if (request.waitForMetaDataVersion() != null) {
final Predicate<ClusterState> metadataVersionPredicate = clusterState -> {
return clusterState.metaData().version() >= request.waitForMetaDataVersion();
};
final ClusterStateObserver observer =
new ClusterStateObserver(clusterService, request.waitForTimeout(), logger, threadPool.getThreadContext());
final ClusterState clusterState = observer.setAndGetObservedState();
if (metadataVersionPredicate.test(clusterState)) {
buildResponse(request, clusterState, listener);
} else {
observer.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
try {
buildResponse(request, state, listener);
} catch (Exception e) {
listener.onFailure(e);
}
}

@Override
public void onClusterServiceClose() {
listener.onFailure(new NodeClosedException(clusterService.localNode()));
}

@Override
public void onTimeout(TimeValue timeout) {
try {
listener.onResponse(new ClusterStateResponse(clusterState.getClusterName(), null, 0L, true));
} catch (Exception e) {
listener.onFailure(e);
}
}
}, metadataVersionPredicate);
}
} else {
ClusterState currentState = clusterService.state();
buildResponse(request, currentState, listener);
}
}

private void buildResponse(final ClusterStateRequest request,
final ClusterState currentState,
final ActionListener<ClusterStateResponse> listener) throws IOException {
logger.trace("Serving cluster state request using version {}", currentState.version());
ClusterState.Builder builder = ClusterState.builder(currentState.getClusterName());
builder.version(currentState.version());
Expand Down Expand Up @@ -133,7 +181,7 @@ protected void masterOperation(final ClusterStateRequest request, final ClusterS
}
}
listener.onResponse(new ClusterStateResponse(currentState.getClusterName(), builder.build(),
serializeFullClusterState(currentState, Version.CURRENT).length()));
serializeFullClusterState(currentState, Version.CURRENT).length(), false));
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
clusterStateRequest.indicesOptions(IndicesOptions.fromRequest(request, clusterStateRequest.indicesOptions()));
clusterStateRequest.local(request.paramAsBoolean("local", clusterStateRequest.local()));
clusterStateRequest.masterNodeTimeout(request.paramAsTime("master_timeout", clusterStateRequest.masterNodeTimeout()));
if (request.hasParam("wait_for_metadata_version")) {
clusterStateRequest.waitForMetaDataVersion(request.paramAsLong("wait_for_metadata_version", 0));
}
clusterStateRequest.waitForTimeout(request.paramAsTime("wait_for_timeout", ClusterStateRequest.DEFAULT_WAIT_FOR_NODE_TIMEOUT));

final String[] indices = Strings.splitStringByCommaToArray(request.param("indices", "_all"));
boolean isAllIndicesOnly = indices.length == 1 && "_all".equals(indices[0]);
Expand Down Expand Up @@ -94,6 +98,9 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
@Override
public RestResponse buildResponse(ClusterStateResponse response, XContentBuilder builder) throws Exception {
builder.startObject();
if (clusterStateRequest.waitForMetaDataVersion() != null) {
builder.field(Fields.WAIT_FOR_TIMED_OUT, response.isWaitForTimedOut());
}
builder.field(Fields.CLUSTER_NAME, response.getClusterName().value());
builder.humanReadableField(Fields.CLUSTER_STATE_SIZE_IN_BYTES, Fields.CLUSTER_STATE_SIZE,
response.getTotalCompressedSize());
Expand Down Expand Up @@ -124,6 +131,7 @@ public boolean canTripCircuitBreaker() {
}

static final class Fields {
static final String WAIT_FOR_TIMED_OUT = "wait_for_timed_out";
static final String CLUSTER_NAME = "cluster_name";
static final String CLUSTER_STATE_SIZE = "compressed_size";
static final String CLUSTER_STATE_SIZE_IN_BYTES = "compressed_size_in_bytes";
Expand Down
Loading

0 comments on commit 7624734

Please sign in to comment.