Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce long polling for changes #33683

Merged
merged 11 commits into from
Sep 16, 2018
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.elasticsearch.xpack.ccr.action;

import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.support.ActionFilters;
Expand All @@ -19,6 +20,7 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.shard.IndexShard;
Expand All @@ -36,8 +38,10 @@
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeoutException;

import static org.elasticsearch.action.ValidateActions.addValidationError;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;

public class ShardChangesAction extends Action<ShardChangesAction.Response> {

Expand All @@ -59,6 +63,7 @@ public static class Request extends SingleShardRequest<Request> {
private int maxOperationCount;
private ShardId shardId;
private String expectedHistoryUUID;
private TimeValue pollTimeout = FollowIndexAction.DEFAULT_POLL_TIMEOUT;
private long maxOperationSizeInBytes = FollowIndexAction.DEFAULT_MAX_BATCH_SIZE_IN_BYTES;

public Request(ShardId shardId, String expectedHistoryUUID) {
Expand Down Expand Up @@ -102,6 +107,14 @@ public String getExpectedHistoryUUID() {
return expectedHistoryUUID;
}

public TimeValue getPollTimeout() {
return pollTimeout;
}

public void setPollTimeout(final TimeValue pollTimeout) {
this.pollTimeout = Objects.requireNonNull(pollTimeout, "pollTimeout");
}

@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
Expand All @@ -126,6 +139,7 @@ public void readFrom(StreamInput in) throws IOException {
maxOperationCount = in.readVInt();
shardId = ShardId.readShardId(in);
expectedHistoryUUID = in.readString();
pollTimeout = in.readTimeValue();
maxOperationSizeInBytes = in.readVLong();
}

Expand All @@ -136,6 +150,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(maxOperationCount);
shardId.writeTo(out);
out.writeString(expectedHistoryUUID);
out.writeTimeValue(pollTimeout);
out.writeVLong(maxOperationSizeInBytes);
}

Expand All @@ -149,12 +164,13 @@ public boolean equals(final Object o) {
maxOperationCount == request.maxOperationCount &&
Objects.equals(shardId, request.shardId) &&
Objects.equals(expectedHistoryUUID, request.expectedHistoryUUID) &&
Objects.equals(pollTimeout, request.pollTimeout) &&
maxOperationSizeInBytes == request.maxOperationSizeInBytes;
}

@Override
public int hashCode() {
return Objects.hash(fromSeqNo, maxOperationCount, shardId, expectedHistoryUUID, maxOperationSizeInBytes);
return Objects.hash(fromSeqNo, maxOperationCount, shardId, expectedHistoryUUID, pollTimeout, maxOperationSizeInBytes);
}

@Override
Expand All @@ -164,6 +180,7 @@ public String toString() {
", maxOperationCount=" + maxOperationCount +
", shardId=" + shardId +
", expectedHistoryUUID=" + expectedHistoryUUID +
", pollTimeout=" + pollTimeout +
", maxOperationSizeInBytes=" + maxOperationSizeInBytes +
'}';
}
Expand Down Expand Up @@ -265,19 +282,61 @@ public TransportAction(Settings settings,

@Override
protected Response shardOperation(Request request, ShardId shardId) throws IOException {
IndexService indexService = indicesService.indexServiceSafe(request.getShard().getIndex());
IndexShard indexShard = indexService.getShard(request.getShard().id());
final SeqNoStats seqNoStats = indexShard.seqNoStats();
final IndexService indexService = indicesService.indexServiceSafe(request.getShard().getIndex());
final IndexShard indexShard = indexService.getShard(request.getShard().id());
final SeqNoStats seqNoStats = indexShard.seqNoStats();
final long mappingVersion = clusterService.state().metaData().index(shardId.getIndex()).getMappingVersion();

final Translog.Operation[] operations = getOperations(
indexShard,
seqNoStats.getGlobalCheckpoint(),
request.fromSeqNo,
request.maxOperationCount,
request.expectedHistoryUUID,
request.maxOperationSizeInBytes);
return new Response(mappingVersion, seqNoStats.getGlobalCheckpoint(), seqNoStats.getMaxSeqNo(), operations);
request.getFromSeqNo(),
request.getMaxOperationCount(),
request.getExpectedHistoryUUID(),
request.getMaxOperationSizeInBytes());
return getResponse(mappingVersion, seqNoStats, operations);
}

@Override
protected void asyncShardOperation(
final Request request,
final ShardId shardId,
final ActionListener<Response> listener) throws IOException {
final IndexService indexService = indicesService.indexServiceSafe(request.getShard().getIndex());
jasontedor marked this conversation as resolved.
Show resolved Hide resolved
final IndexShard indexShard = indexService.getShard(request.getShard().id());
final SeqNoStats seqNoStats = indexShard.seqNoStats();

if (request.getFromSeqNo() > seqNoStats.getGlobalCheckpoint()) {
assert request.getFromSeqNo() == 1 + seqNoStats.getGlobalCheckpoint();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if this assertion always holds.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dnhatn Please see my latest pushes. I have integrated #33690 so that we only wake up if we advanced to the request from sequence number, or timeout.

indexShard.addGlobalCheckpointListener(
request.getFromSeqNo(),
(g, e) -> {
if (g == UNASSIGNED_SEQ_NO) {
assert e != null;
if (e instanceof TimeoutException) {
try {
final long mappingVersion =
clusterService.state().metaData().index(shardId.getIndex()).getMappingVersion();
final SeqNoStats latestSeqNoStats = indexShard.seqNoStats();
listener.onResponse(getResponse(mappingVersion, latestSeqNoStats, EMPTY_OPERATIONS_ARRAY));
} catch (final Exception caught) {
listener.onFailure(caught);
}
} else {
listener.onFailure(e);
}
} else {
try {
super.asyncShardOperation(request, shardId, listener);
} catch (final IOException e1) {
listener.onFailure(e1);
}
}
},
request.getPollTimeout());
} else {
super.asyncShardOperation(request, shardId, listener);
}
}

@Override
Expand All @@ -300,7 +359,7 @@ protected Response newResponse() {

}

private static final Translog.Operation[] EMPTY_OPERATIONS_ARRAY = new Translog.Operation[0];
static final Translog.Operation[] EMPTY_OPERATIONS_ARRAY = new Translog.Operation[0];

/**
* Returns at most maxOperationCount operations from the specified from sequence number.
Expand All @@ -324,7 +383,8 @@ static Translog.Operation[] getOperations(IndexShard indexShard,
historyUUID + "]");
}
if (fromSeqNo > globalCheckpoint) {
return EMPTY_OPERATIONS_ARRAY;
throw new IllegalStateException(
"not exposing operations from [" + fromSeqNo + "] greater than the global checkpoint [" + globalCheckpoint + "]");
}
int seenBytes = 0;
// - 1 is needed, because toSeqNo is inclusive
Expand All @@ -344,4 +404,8 @@ static Translog.Operation[] getOperations(IndexShard indexShard,
return operations.toArray(EMPTY_OPERATIONS_ARRAY);
}

static Response getResponse(final long mappingVersion, final SeqNoStats seqNoStats, final Translog.Operation[] operations) {
return new Response(mappingVersion, seqNoStats.getGlobalCheckpoint(), seqNoStats.getMaxSeqNo(), operations);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {

private final String leaderIndex;
private final ShardFollowTask params;
private final TimeValue pollTimeout;
private final TimeValue maxRetryDelay;
private final TimeValue idleShardChangesRequestDelay;
private final BiConsumer<TimeValue, Runnable> scheduler;
private final LongSupplier relativeTimeProvider;

Expand Down Expand Up @@ -80,8 +80,8 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
this.params = params;
this.scheduler = scheduler;
this.relativeTimeProvider = relativeTimeProvider;
this.pollTimeout = params.getPollTimeout();
this.maxRetryDelay = params.getMaxRetryDelay();
this.idleShardChangesRequestDelay = params.getIdleShardRetryDelay();
/*
* We keep track of the most recent fetch exceptions, with the number of exceptions that we track equal to the maximum number of
* concurrent fetches. For each failed fetch, we track the from sequence number associated with the request, and we clear the entry
Expand Down Expand Up @@ -227,12 +227,16 @@ private void sendShardChangesRequest(long from, int maxOperationCount, long maxR
}
innerSendShardChangesRequest(from, maxOperationCount,
response -> {
synchronized (ShardFollowNodeTask.this) {
totalFetchTimeMillis += TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTime);
numberOfSuccessfulFetches++;
fetchExceptions.remove(from);
operationsReceived += response.getOperations().length;
totalTransferredBytes += Arrays.stream(response.getOperations()).mapToLong(Translog.Operation::estimateSize).sum();
if (response.getOperations().length > 0) {
// do not count polls against fetch stats
synchronized (ShardFollowNodeTask.this) {
totalFetchTimeMillis += TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTime);
numberOfSuccessfulFetches++;
fetchExceptions.remove(from);
operationsReceived += response.getOperations().length;
totalTransferredBytes +=
Arrays.stream(response.getOperations()).mapToLong(Translog.Operation::estimateSize).sum();
}
}
handleReadResponse(from, maxRequiredSeqNo, response);
},
Expand Down Expand Up @@ -284,15 +288,7 @@ synchronized void innerHandleReadResponse(long from, long maxRequiredSeqNo, Shar
} else {
// read is completed, decrement
numConcurrentReads--;
if (response.getOperations().length == 0 && leaderGlobalCheckpoint == lastRequestedSeqNo) {
// we got nothing and we have no reason to believe asking again well get us more, treat shard as idle and delay
// future requests
LOGGER.trace("{} received no ops and no known ops to fetch, scheduling to coordinate reads",
params.getFollowShardId());
scheduler.accept(idleShardChangesRequestDelay, this::coordinateReads);
} else {
coordinateReads();
}
coordinateReads();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
public static final ParseField MAX_CONCURRENT_WRITE_BATCHES = new ParseField("max_concurrent_write_batches");
public static final ParseField MAX_WRITE_BUFFER_SIZE = new ParseField("max_write_buffer_size");
public static final ParseField MAX_RETRY_DELAY = new ParseField("max_retry_delay");
public static final ParseField IDLE_SHARD_RETRY_DELAY = new ParseField("idle_shard_retry_delay");
public static final ParseField POLL_TIMEOUT = new ParseField("poll_timeout");
public static final ParseField RECORDED_HISTORY_UUID = new ParseField("recorded_history_uuid");

@SuppressWarnings("unchecked")
Expand All @@ -75,8 +75,8 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
(p, c) -> TimeValue.parseTimeValue(p.text(), MAX_RETRY_DELAY.getPreferredName()),
MAX_RETRY_DELAY, ObjectParser.ValueType.STRING);
PARSER.declareField(ConstructingObjectParser.constructorArg(),
(p, c) -> TimeValue.parseTimeValue(p.text(), IDLE_SHARD_RETRY_DELAY.getPreferredName()),
IDLE_SHARD_RETRY_DELAY, ObjectParser.ValueType.STRING);
(p, c) -> TimeValue.parseTimeValue(p.text(), POLL_TIMEOUT.getPreferredName()),
POLL_TIMEOUT, ObjectParser.ValueType.STRING);
PARSER.declareString(ConstructingObjectParser.constructorArg(), RECORDED_HISTORY_UUID);
PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> p.mapStrings(), HEADERS);
}
Expand All @@ -90,23 +90,23 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
private final int maxConcurrentWriteBatches;
private final int maxWriteBufferSize;
private final TimeValue maxRetryDelay;
private final TimeValue idleShardRetryDelay;
private final TimeValue pollTimeout;
private final String recordedLeaderIndexHistoryUUID;
private final Map<String, String> headers;

ShardFollowTask(
String leaderClusterAlias,
ShardId followShardId,
ShardId leaderShardId,
int maxBatchOperationCount,
int maxConcurrentReadBatches,
long maxBatchSizeInBytes,
int maxConcurrentWriteBatches,
int maxWriteBufferSize,
TimeValue maxRetryDelay,
TimeValue idleShardRetryDelay,
String recordedLeaderIndexHistoryUUID,
Map<String, String> headers) {
final String leaderClusterAlias,
final ShardId followShardId,
final ShardId leaderShardId,
final int maxBatchOperationCount,
final int maxConcurrentReadBatches,
final long maxBatchSizeInBytes,
final int maxConcurrentWriteBatches,
final int maxWriteBufferSize,
final TimeValue maxRetryDelay,
final TimeValue pollTimeout,
final String recordedLeaderIndexHistoryUUID,
final Map<String, String> headers) {
this.leaderClusterAlias = leaderClusterAlias;
this.followShardId = followShardId;
this.leaderShardId = leaderShardId;
Expand All @@ -116,7 +116,7 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
this.maxConcurrentWriteBatches = maxConcurrentWriteBatches;
this.maxWriteBufferSize = maxWriteBufferSize;
this.maxRetryDelay = maxRetryDelay;
this.idleShardRetryDelay = idleShardRetryDelay;
this.pollTimeout = pollTimeout;
this.recordedLeaderIndexHistoryUUID = recordedLeaderIndexHistoryUUID;
this.headers = headers != null ? Collections.unmodifiableMap(headers) : Collections.emptyMap();
}
Expand All @@ -131,7 +131,7 @@ public ShardFollowTask(StreamInput in) throws IOException {
this.maxConcurrentWriteBatches = in.readVInt();
this.maxWriteBufferSize = in.readVInt();
this.maxRetryDelay = in.readTimeValue();
this.idleShardRetryDelay = in.readTimeValue();
this.pollTimeout = in.readTimeValue();
this.recordedLeaderIndexHistoryUUID = in.readString();
this.headers = Collections.unmodifiableMap(in.readMap(StreamInput::readString, StreamInput::readString));
}
Expand Down Expand Up @@ -172,8 +172,8 @@ public TimeValue getMaxRetryDelay() {
return maxRetryDelay;
}

public TimeValue getIdleShardRetryDelay() {
return idleShardRetryDelay;
public TimeValue getPollTimeout() {
return pollTimeout;
}

public String getTaskId() {
Expand Down Expand Up @@ -204,7 +204,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(maxConcurrentWriteBatches);
out.writeVInt(maxWriteBufferSize);
out.writeTimeValue(maxRetryDelay);
out.writeTimeValue(idleShardRetryDelay);
out.writeTimeValue(pollTimeout);
out.writeString(recordedLeaderIndexHistoryUUID);
out.writeMap(headers, StreamOutput::writeString, StreamOutput::writeString);
}
Expand All @@ -231,7 +231,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field(MAX_CONCURRENT_WRITE_BATCHES.getPreferredName(), maxConcurrentWriteBatches);
builder.field(MAX_WRITE_BUFFER_SIZE.getPreferredName(), maxWriteBufferSize);
builder.field(MAX_RETRY_DELAY.getPreferredName(), maxRetryDelay.getStringRep());
builder.field(IDLE_SHARD_RETRY_DELAY.getPreferredName(), idleShardRetryDelay.getStringRep());
builder.field(POLL_TIMEOUT.getPreferredName(), pollTimeout.getStringRep());
builder.field(RECORDED_HISTORY_UUID.getPreferredName(), recordedLeaderIndexHistoryUUID);
builder.field(HEADERS.getPreferredName(), headers);
return builder.endObject();
Expand All @@ -251,7 +251,7 @@ public boolean equals(Object o) {
maxBatchSizeInBytes == that.maxBatchSizeInBytes &&
maxWriteBufferSize == that.maxWriteBufferSize &&
Objects.equals(maxRetryDelay, that.maxRetryDelay) &&
Objects.equals(idleShardRetryDelay, that.idleShardRetryDelay) &&
Objects.equals(pollTimeout, that.pollTimeout) &&
Objects.equals(recordedLeaderIndexHistoryUUID, that.recordedLeaderIndexHistoryUUID) &&
Objects.equals(headers, that.headers);
}
Expand All @@ -268,7 +268,7 @@ public int hashCode() {
maxBatchSizeInBytes,
maxWriteBufferSize,
maxRetryDelay,
idleShardRetryDelay,
pollTimeout,
recordedLeaderIndexHistoryUUID,
headers
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ protected void innerSendShardChangesRequest(long from, int maxOperationCount, Co
request.setFromSeqNo(from);
request.setMaxOperationCount(maxOperationCount);
request.setMaxOperationSizeInBytes(params.getMaxBatchSizeInBytes());
request.setPollTimeout(params.getPollTimeout());
leaderClient.execute(ShardChangesAction.INSTANCE, request, ActionListener.wrap(handler::accept, errorHandler));
}
};
Expand Down
Loading