Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose retries for CCR fetch failures #33694

Merged
merged 2 commits into from
Sep 14, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.transport.NetworkExceptionHelper;
import org.elasticsearch.common.unit.TimeValue;
Expand All @@ -36,6 +37,7 @@
import java.util.function.Consumer;
import java.util.function.LongConsumer;
import java.util.function.LongSupplier;
import java.util.stream.Collectors;

/**
* The node task that fetch the write operations from a leader shard and
Expand Down Expand Up @@ -72,7 +74,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
private long numberOfOperationsIndexed = 0;
private long lastFetchTime = -1;
private final Queue<Translog.Operation> buffer = new PriorityQueue<>(Comparator.comparing(Translog.Operation::seqNo));
private final LinkedHashMap<Long, ElasticsearchException> fetchExceptions;
private final LinkedHashMap<Long, Tuple<AtomicInteger, ElasticsearchException>> fetchExceptions;

ShardFollowNodeTask(long id, String type, String action, String description, TaskId parentTask, Map<String, String> headers,
ShardFollowTask params, BiConsumer<TimeValue, Runnable> scheduler, final LongSupplier relativeTimeProvider) {
Expand All @@ -87,9 +89,9 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
* concurrent fetches. For each failed fetch, we track the from sequence number associated with the request, and we clear the entry
* when the fetch task associated with that from sequence number succeeds.
*/
this.fetchExceptions = new LinkedHashMap<Long, ElasticsearchException>() {
this.fetchExceptions = new LinkedHashMap<Long, Tuple<AtomicInteger, ElasticsearchException>>() {
@Override
protected boolean removeEldestEntry(final Map.Entry<Long, ElasticsearchException> eldest) {
protected boolean removeEldestEntry(final Map.Entry<Long, Tuple<AtomicInteger, ElasticsearchException>> eldest) {
return size() > params.getMaxConcurrentReadBatches();
}
};
Expand Down Expand Up @@ -240,7 +242,7 @@ private void sendShardChangesRequest(long from, int maxOperationCount, long maxR
synchronized (ShardFollowNodeTask.this) {
totalFetchTimeMillis += TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTime);
numberOfFailedFetches++;
fetchExceptions.put(from, new ElasticsearchException(e));
fetchExceptions.put(from, Tuple.tuple(retryCounter, new ElasticsearchException(e)));
}
handleFailure(e, retryCounter, () -> sendShardChangesRequest(from, maxOperationCount, maxRequiredSeqNo, retryCounter));
});
Expand Down Expand Up @@ -438,7 +440,12 @@ public synchronized ShardFollowNodeTaskStatus getStatus() {
numberOfSuccessfulBulkOperations,
numberOfFailedBulkOperations,
numberOfOperationsIndexed,
new TreeMap<>(fetchExceptions),
new TreeMap<>(
fetchExceptions
.entrySet()
.stream()
.collect(
Collectors.toMap(Map.Entry::getKey, e -> Tuple.tuple(e.getValue().v1().get(), e.getValue().v2())))),
timeSinceLastFetchMillis);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package org.elasticsearch.xpack.ccr.action;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractSerializingTestCase;
Expand Down Expand Up @@ -83,15 +84,17 @@ protected void assertEqualInstances(final ShardFollowNodeTaskStatus expectedInst
assertThat(newInstance.numberOfOperationsIndexed(), equalTo(expectedInstance.numberOfOperationsIndexed()));
assertThat(newInstance.fetchExceptions().size(), equalTo(expectedInstance.fetchExceptions().size()));
assertThat(newInstance.fetchExceptions().keySet(), equalTo(expectedInstance.fetchExceptions().keySet()));
for (final Map.Entry<Long, ElasticsearchException> entry : newInstance.fetchExceptions().entrySet()) {
for (final Map.Entry<Long, Tuple<Integer, ElasticsearchException>> entry : newInstance.fetchExceptions().entrySet()) {
final Tuple<Integer, ElasticsearchException> expectedTuple = expectedInstance.fetchExceptions().get(entry.getKey());
assertThat(entry.getValue().v1(), equalTo(expectedTuple.v1()));
// x-content loses the exception
final ElasticsearchException expected = expectedInstance.fetchExceptions().get(entry.getKey());
assertThat(entry.getValue().getMessage(), containsString(expected.getMessage()));
assertNotNull(entry.getValue().getCause());
final ElasticsearchException expected = expectedTuple.v2();
assertThat(entry.getValue().v2().getMessage(), containsString(expected.getMessage()));
assertNotNull(entry.getValue().v2().getCause());
assertThat(
entry.getValue().getCause(),
entry.getValue().v2().getCause(),
anyOf(instanceOf(ElasticsearchException.class), instanceOf(IllegalStateException.class)));
assertThat(entry.getValue().getCause().getMessage(), containsString(expected.getCause().getMessage()));
assertThat(entry.getValue().v2().getCause().getMessage(), containsString(expected.getCause().getMessage()));
}
assertThat(newInstance.timeSinceLastFetchMillis(), equalTo(expectedInstance.timeSinceLastFetchMillis()));
}
Expand All @@ -101,11 +104,15 @@ protected boolean assertToXContentEquivalence() {
return false;
}

private NavigableMap<Long, ElasticsearchException> randomReadExceptions() {
private NavigableMap<Long, Tuple<Integer, ElasticsearchException>> randomReadExceptions() {
final int count = randomIntBetween(0, 16);
final NavigableMap<Long, ElasticsearchException> readExceptions = new TreeMap<>();
final NavigableMap<Long, Tuple<Integer, ElasticsearchException>> readExceptions = new TreeMap<>();
for (int i = 0; i < count; i++) {
readExceptions.put(randomNonNegativeLong(), new ElasticsearchException(new IllegalStateException("index [" + i + "]")));
readExceptions.put(
randomNonNegativeLong(),
Tuple.tuple(
randomIntBetween(0, Integer.MAX_VALUE),
new ElasticsearchException(new IllegalStateException("index [" + i + "]"))));
}
return readExceptions;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardNotFoundException;
Expand Down Expand Up @@ -192,12 +193,13 @@ public void testReceiveRetryableError() {
assertThat(status.numberOfFailedFetches(), equalTo(retryCounter.get()));
if (retryCounter.get() > 0) {
assertThat(status.fetchExceptions().entrySet(), hasSize(1));
final Map.Entry<Long, ElasticsearchException> entry = status.fetchExceptions().entrySet().iterator().next();
final Map.Entry<Long, Tuple<Integer, ElasticsearchException>> entry = status.fetchExceptions().entrySet().iterator().next();
assertThat(entry.getValue().v1(), equalTo(Math.toIntExact(retryCounter.get())));
assertThat(entry.getKey(), equalTo(0L));
assertThat(entry.getValue(), instanceOf(ElasticsearchException.class));
assertNotNull(entry.getValue().getCause());
assertThat(entry.getValue().getCause(), instanceOf(ShardNotFoundException.class));
final ShardNotFoundException cause = (ShardNotFoundException) entry.getValue().getCause();
assertThat(entry.getValue().v2(), instanceOf(ElasticsearchException.class));
assertNotNull(entry.getValue().v2().getCause());
assertThat(entry.getValue().v2().getCause(), instanceOf(ShardNotFoundException.class));
final ShardNotFoundException cause = (ShardNotFoundException) entry.getValue().v2().getCause();
assertThat(cause.getShardId().getIndexName(), equalTo("leader_index"));
assertThat(cause.getShardId().getId(), equalTo(0));
}
Expand Down Expand Up @@ -253,12 +255,12 @@ public void testReceiveNonRetryableError() {
assertThat(status.numberOfConcurrentWrites(), equalTo(0));
assertThat(status.numberOfFailedFetches(), equalTo(1L));
assertThat(status.fetchExceptions().entrySet(), hasSize(1));
final Map.Entry<Long, ElasticsearchException> entry = status.fetchExceptions().entrySet().iterator().next();
final Map.Entry<Long, Tuple<Integer, ElasticsearchException>> entry = status.fetchExceptions().entrySet().iterator().next();
assertThat(entry.getKey(), equalTo(0L));
assertThat(entry.getValue(), instanceOf(ElasticsearchException.class));
assertNotNull(entry.getValue().getCause());
assertThat(entry.getValue().getCause(), instanceOf(RuntimeException.class));
final RuntimeException cause = (RuntimeException) entry.getValue().getCause();
assertThat(entry.getValue().v2(), instanceOf(ElasticsearchException.class));
assertNotNull(entry.getValue().v2().getCause());
assertThat(entry.getValue().v2().getCause(), instanceOf(RuntimeException.class));
final RuntimeException cause = (RuntimeException) entry.getValue().v2().getCause();
assertThat(cause.getMessage(), equalTo("replication failed"));
assertThat(status.lastRequestedSeqNo(), equalTo(63L));
assertThat(status.leaderGlobalCheckpoint(), equalTo(63L));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.ByteSizeUnit;
Expand Down Expand Up @@ -84,17 +85,17 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
(long) args[19],
(long) args[20],
new TreeMap<>(
((List<Map.Entry<Long, ElasticsearchException>>) args[21])
((List<Map.Entry<Long, Tuple<Integer, ElasticsearchException>>>) args[21])
.stream()
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))),
(long) args[22]));

public static final String FETCH_EXCEPTIONS_ENTRY_PARSER_NAME = "shard-follow-node-task-status-fetch-exceptions-entry";

static final ConstructingObjectParser<Map.Entry<Long, ElasticsearchException>, Void> FETCH_EXCEPTIONS_ENTRY_PARSER =
static final ConstructingObjectParser<Map.Entry<Long, Tuple<Integer, ElasticsearchException>>, Void> FETCH_EXCEPTIONS_ENTRY_PARSER =
new ConstructingObjectParser<>(
FETCH_EXCEPTIONS_ENTRY_PARSER_NAME,
args -> new AbstractMap.SimpleEntry<>((long) args[0], (ElasticsearchException) args[1]));
args -> new AbstractMap.SimpleEntry<>((long) args[0], Tuple.tuple((Integer)args[1], (ElasticsearchException)args[2])));

static {
STATUS_PARSER.declareString(ConstructingObjectParser.constructorArg(), LEADER_INDEX);
Expand Down Expand Up @@ -123,10 +124,12 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
}

static final ParseField FETCH_EXCEPTIONS_ENTRY_FROM_SEQ_NO = new ParseField("from_seq_no");
static final ParseField FETCH_EXCEPTIONS_RETRIES = new ParseField("retries");
static final ParseField FETCH_EXCEPTIONS_ENTRY_EXCEPTION = new ParseField("exception");

static {
FETCH_EXCEPTIONS_ENTRY_PARSER.declareLong(ConstructingObjectParser.constructorArg(), FETCH_EXCEPTIONS_ENTRY_FROM_SEQ_NO);
FETCH_EXCEPTIONS_ENTRY_PARSER.declareInt(ConstructingObjectParser.constructorArg(), FETCH_EXCEPTIONS_RETRIES);
FETCH_EXCEPTIONS_ENTRY_PARSER.declareObject(
ConstructingObjectParser.constructorArg(),
(p, c) -> ElasticsearchException.fromXContent(p),
Expand Down Expand Up @@ -259,9 +262,9 @@ public long numberOfOperationsIndexed() {
return numberOfOperationsIndexed;
}

private final NavigableMap<Long, ElasticsearchException> fetchExceptions;
private final NavigableMap<Long, Tuple<Integer, ElasticsearchException>> fetchExceptions;

public NavigableMap<Long, ElasticsearchException> fetchExceptions() {
public NavigableMap<Long, Tuple<Integer, ElasticsearchException>> fetchExceptions() {
return fetchExceptions;
}

Expand Down Expand Up @@ -293,7 +296,7 @@ public ShardFollowNodeTaskStatus(
final long numberOfSuccessfulBulkOperations,
final long numberOfFailedBulkOperations,
final long numberOfOperationsIndexed,
final NavigableMap<Long, ElasticsearchException> fetchExceptions,
final NavigableMap<Long, Tuple<Integer, ElasticsearchException>> fetchExceptions,
final long timeSinceLastFetchMillis) {
this.leaderIndex = leaderIndex;
this.followerIndex = followerIndex;
Expand Down Expand Up @@ -342,7 +345,8 @@ public ShardFollowNodeTaskStatus(final StreamInput in) throws IOException {
this.numberOfSuccessfulBulkOperations = in.readVLong();
this.numberOfFailedBulkOperations = in.readVLong();
this.numberOfOperationsIndexed = in.readVLong();
this.fetchExceptions = new TreeMap<>(in.readMap(StreamInput::readVLong, StreamInput::readException));
this.fetchExceptions =
new TreeMap<>(in.readMap(StreamInput::readVLong, stream -> Tuple.tuple(stream.readVInt(), stream.readException())));
this.timeSinceLastFetchMillis = in.readZLong();
}

Expand Down Expand Up @@ -374,7 +378,10 @@ public void writeTo(final StreamOutput out) throws IOException {
out.writeVLong(numberOfSuccessfulBulkOperations);
out.writeVLong(numberOfFailedBulkOperations);
out.writeVLong(numberOfOperationsIndexed);
out.writeMap(fetchExceptions, StreamOutput::writeVLong, StreamOutput::writeException);
out.writeMap(
fetchExceptions,
StreamOutput::writeVLong,
(stream, value) -> { stream.writeVInt(value.v1()); stream.writeException(value.v2()); });
out.writeZLong(timeSinceLastFetchMillis);
}

Expand Down Expand Up @@ -421,14 +428,15 @@ public XContentBuilder toXContentFragment(final XContentBuilder builder, final P
builder.field(NUMBER_OF_OPERATIONS_INDEXED_FIELD.getPreferredName(), numberOfOperationsIndexed);
builder.startArray(FETCH_EXCEPTIONS.getPreferredName());
{
for (final Map.Entry<Long, ElasticsearchException> entry : fetchExceptions.entrySet()) {
for (final Map.Entry<Long, Tuple<Integer, ElasticsearchException>> entry : fetchExceptions.entrySet()) {
builder.startObject();
{
builder.field(FETCH_EXCEPTIONS_ENTRY_FROM_SEQ_NO.getPreferredName(), entry.getKey());
builder.field(FETCH_EXCEPTIONS_RETRIES.getPreferredName(), entry.getValue().v1());
builder.field(FETCH_EXCEPTIONS_ENTRY_EXCEPTION.getPreferredName());
builder.startObject();
{
ElasticsearchException.generateThrowableXContent(builder, params, entry.getValue());
ElasticsearchException.generateThrowableXContent(builder, params, entry.getValue().v2());
}
builder.endObject();
}
Expand Down Expand Up @@ -515,7 +523,7 @@ public int hashCode() {
}

private static List<String> getFetchExceptionMessages(final ShardFollowNodeTaskStatus status) {
return status.fetchExceptions().values().stream().map(ElasticsearchException::getMessage).collect(Collectors.toList());
return status.fetchExceptions().values().stream().map(t -> t.v2().getMessage()).collect(Collectors.toList());
}

public String toString() {
Expand Down
3 changes: 3 additions & 0 deletions x-pack/plugin/core/src/main/resources/monitoring-es.json
Original file line number Diff line number Diff line change
Expand Up @@ -987,6 +987,9 @@
"from_seq_no": {
"type": "long"
},
"retries": {
"type": "integer"
},
"exception": {
"type": "text"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
Expand Down Expand Up @@ -101,8 +102,10 @@ public void testToXContent() throws IOException {
final long numberOfSuccessfulBulkOperations = randomNonNegativeLong();
final long numberOfFailedBulkOperations = randomNonNegativeLong();
final long numberOfOperationsIndexed = randomNonNegativeLong();
final NavigableMap<Long, ElasticsearchException> fetchExceptions =
new TreeMap<>(Collections.singletonMap(randomNonNegativeLong(), new ElasticsearchException("shard is sad")));
final NavigableMap<Long, Tuple<Integer, ElasticsearchException>> fetchExceptions =
new TreeMap<>(Collections.singletonMap(
randomNonNegativeLong(),
Tuple.tuple(randomIntBetween(0, Integer.MAX_VALUE), new ElasticsearchException("shard is sad"))));
final long timeSinceLastFetchMillis = randomNonNegativeLong();
final ShardFollowNodeTaskStatus status = new ShardFollowNodeTaskStatus(
"cluster_alias:leader_index",
Expand Down Expand Up @@ -171,6 +174,7 @@ public void testToXContent() throws IOException {
+ "\"fetch_exceptions\":["
+ "{"
+ "\"from_seq_no\":" + fetchExceptions.keySet().iterator().next() + ","
+ "\"retries\":" + fetchExceptions.values().iterator().next().v1() + ","
+ "\"exception\":{"
+ "\"type\":\"exception\","
+ "\"reason\":\"shard is sad\""
Expand All @@ -183,8 +187,8 @@ public void testToXContent() throws IOException {
}

public void testShardFollowNodeTaskStatusFieldsMapped() throws IOException {
martijnvg marked this conversation as resolved.
Show resolved Hide resolved
final NavigableMap<Long, ElasticsearchException> fetchExceptions =
new TreeMap<>(Collections.singletonMap(1L, new ElasticsearchException("shard is sad")));
final NavigableMap<Long, Tuple<Integer, ElasticsearchException>> fetchExceptions =
new TreeMap<>(Collections.singletonMap(1L, Tuple.tuple(2, new ElasticsearchException("shard is sad"))));
final ShardFollowNodeTaskStatus status = new ShardFollowNodeTaskStatus(
"cluster_alias:leader_index",
"follower_index",
Expand Down