Skip to content

Commit

Permalink
Expose retries for CCR fetch failures (#33694)
Browse files Browse the repository at this point in the history
This commit exposes the number of times that a fetch has been tried to
the CCR stats endpoint, and to CCR monitoring.
  • Loading branch information
jasontedor authored Sep 14, 2018
1 parent 140c3bb commit 2282150
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.transport.NetworkExceptionHelper;
import org.elasticsearch.common.unit.TimeValue;
Expand All @@ -36,6 +37,7 @@
import java.util.function.Consumer;
import java.util.function.LongConsumer;
import java.util.function.LongSupplier;
import java.util.stream.Collectors;

/**
* The node task that fetch the write operations from a leader shard and
Expand Down Expand Up @@ -72,7 +74,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
private long numberOfOperationsIndexed = 0;
private long lastFetchTime = -1;
private final Queue<Translog.Operation> buffer = new PriorityQueue<>(Comparator.comparing(Translog.Operation::seqNo));
private final LinkedHashMap<Long, ElasticsearchException> fetchExceptions;
private final LinkedHashMap<Long, Tuple<AtomicInteger, ElasticsearchException>> fetchExceptions;

ShardFollowNodeTask(long id, String type, String action, String description, TaskId parentTask, Map<String, String> headers,
ShardFollowTask params, BiConsumer<TimeValue, Runnable> scheduler, final LongSupplier relativeTimeProvider) {
Expand All @@ -87,9 +89,9 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
* concurrent fetches. For each failed fetch, we track the from sequence number associated with the request, and we clear the entry
* when the fetch task associated with that from sequence number succeeds.
*/
this.fetchExceptions = new LinkedHashMap<Long, ElasticsearchException>() {
this.fetchExceptions = new LinkedHashMap<Long, Tuple<AtomicInteger, ElasticsearchException>>() {
@Override
protected boolean removeEldestEntry(final Map.Entry<Long, ElasticsearchException> eldest) {
protected boolean removeEldestEntry(final Map.Entry<Long, Tuple<AtomicInteger, ElasticsearchException>> eldest) {
return size() > params.getMaxConcurrentReadBatches();
}
};
Expand Down Expand Up @@ -240,7 +242,7 @@ private void sendShardChangesRequest(long from, int maxOperationCount, long maxR
synchronized (ShardFollowNodeTask.this) {
totalFetchTimeMillis += TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTime);
numberOfFailedFetches++;
fetchExceptions.put(from, new ElasticsearchException(e));
fetchExceptions.put(from, Tuple.tuple(retryCounter, new ElasticsearchException(e)));
}
handleFailure(e, retryCounter, () -> sendShardChangesRequest(from, maxOperationCount, maxRequiredSeqNo, retryCounter));
});
Expand Down Expand Up @@ -438,7 +440,12 @@ public synchronized ShardFollowNodeTaskStatus getStatus() {
numberOfSuccessfulBulkOperations,
numberOfFailedBulkOperations,
numberOfOperationsIndexed,
new TreeMap<>(fetchExceptions),
new TreeMap<>(
fetchExceptions
.entrySet()
.stream()
.collect(
Collectors.toMap(Map.Entry::getKey, e -> Tuple.tuple(e.getValue().v1().get(), e.getValue().v2())))),
timeSinceLastFetchMillis);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package org.elasticsearch.xpack.ccr.action;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractSerializingTestCase;
Expand Down Expand Up @@ -83,15 +84,17 @@ protected void assertEqualInstances(final ShardFollowNodeTaskStatus expectedInst
assertThat(newInstance.numberOfOperationsIndexed(), equalTo(expectedInstance.numberOfOperationsIndexed()));
assertThat(newInstance.fetchExceptions().size(), equalTo(expectedInstance.fetchExceptions().size()));
assertThat(newInstance.fetchExceptions().keySet(), equalTo(expectedInstance.fetchExceptions().keySet()));
for (final Map.Entry<Long, ElasticsearchException> entry : newInstance.fetchExceptions().entrySet()) {
for (final Map.Entry<Long, Tuple<Integer, ElasticsearchException>> entry : newInstance.fetchExceptions().entrySet()) {
final Tuple<Integer, ElasticsearchException> expectedTuple = expectedInstance.fetchExceptions().get(entry.getKey());
assertThat(entry.getValue().v1(), equalTo(expectedTuple.v1()));
// x-content loses the exception
final ElasticsearchException expected = expectedInstance.fetchExceptions().get(entry.getKey());
assertThat(entry.getValue().getMessage(), containsString(expected.getMessage()));
assertNotNull(entry.getValue().getCause());
final ElasticsearchException expected = expectedTuple.v2();
assertThat(entry.getValue().v2().getMessage(), containsString(expected.getMessage()));
assertNotNull(entry.getValue().v2().getCause());
assertThat(
entry.getValue().getCause(),
entry.getValue().v2().getCause(),
anyOf(instanceOf(ElasticsearchException.class), instanceOf(IllegalStateException.class)));
assertThat(entry.getValue().getCause().getMessage(), containsString(expected.getCause().getMessage()));
assertThat(entry.getValue().v2().getCause().getMessage(), containsString(expected.getCause().getMessage()));
}
assertThat(newInstance.timeSinceLastFetchMillis(), equalTo(expectedInstance.timeSinceLastFetchMillis()));
}
Expand All @@ -101,11 +104,15 @@ protected boolean assertToXContentEquivalence() {
return false;
}

private NavigableMap<Long, ElasticsearchException> randomReadExceptions() {
private NavigableMap<Long, Tuple<Integer, ElasticsearchException>> randomReadExceptions() {
final int count = randomIntBetween(0, 16);
final NavigableMap<Long, ElasticsearchException> readExceptions = new TreeMap<>();
final NavigableMap<Long, Tuple<Integer, ElasticsearchException>> readExceptions = new TreeMap<>();
for (int i = 0; i < count; i++) {
readExceptions.put(randomNonNegativeLong(), new ElasticsearchException(new IllegalStateException("index [" + i + "]")));
readExceptions.put(
randomNonNegativeLong(),
Tuple.tuple(
randomIntBetween(0, Integer.MAX_VALUE),
new ElasticsearchException(new IllegalStateException("index [" + i + "]"))));
}
return readExceptions;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardNotFoundException;
Expand Down Expand Up @@ -192,12 +193,13 @@ public void testReceiveRetryableError() {
assertThat(status.numberOfFailedFetches(), equalTo(retryCounter.get()));
if (retryCounter.get() > 0) {
assertThat(status.fetchExceptions().entrySet(), hasSize(1));
final Map.Entry<Long, ElasticsearchException> entry = status.fetchExceptions().entrySet().iterator().next();
final Map.Entry<Long, Tuple<Integer, ElasticsearchException>> entry = status.fetchExceptions().entrySet().iterator().next();
assertThat(entry.getValue().v1(), equalTo(Math.toIntExact(retryCounter.get())));
assertThat(entry.getKey(), equalTo(0L));
assertThat(entry.getValue(), instanceOf(ElasticsearchException.class));
assertNotNull(entry.getValue().getCause());
assertThat(entry.getValue().getCause(), instanceOf(ShardNotFoundException.class));
final ShardNotFoundException cause = (ShardNotFoundException) entry.getValue().getCause();
assertThat(entry.getValue().v2(), instanceOf(ElasticsearchException.class));
assertNotNull(entry.getValue().v2().getCause());
assertThat(entry.getValue().v2().getCause(), instanceOf(ShardNotFoundException.class));
final ShardNotFoundException cause = (ShardNotFoundException) entry.getValue().v2().getCause();
assertThat(cause.getShardId().getIndexName(), equalTo("leader_index"));
assertThat(cause.getShardId().getId(), equalTo(0));
}
Expand Down Expand Up @@ -253,12 +255,12 @@ public void testReceiveNonRetryableError() {
assertThat(status.numberOfConcurrentWrites(), equalTo(0));
assertThat(status.numberOfFailedFetches(), equalTo(1L));
assertThat(status.fetchExceptions().entrySet(), hasSize(1));
final Map.Entry<Long, ElasticsearchException> entry = status.fetchExceptions().entrySet().iterator().next();
final Map.Entry<Long, Tuple<Integer, ElasticsearchException>> entry = status.fetchExceptions().entrySet().iterator().next();
assertThat(entry.getKey(), equalTo(0L));
assertThat(entry.getValue(), instanceOf(ElasticsearchException.class));
assertNotNull(entry.getValue().getCause());
assertThat(entry.getValue().getCause(), instanceOf(RuntimeException.class));
final RuntimeException cause = (RuntimeException) entry.getValue().getCause();
assertThat(entry.getValue().v2(), instanceOf(ElasticsearchException.class));
assertNotNull(entry.getValue().v2().getCause());
assertThat(entry.getValue().v2().getCause(), instanceOf(RuntimeException.class));
final RuntimeException cause = (RuntimeException) entry.getValue().v2().getCause();
assertThat(cause.getMessage(), equalTo("replication failed"));
assertThat(status.lastRequestedSeqNo(), equalTo(63L));
assertThat(status.leaderGlobalCheckpoint(), equalTo(63L));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.ByteSizeUnit;
Expand Down Expand Up @@ -84,17 +85,17 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
(long) args[19],
(long) args[20],
new TreeMap<>(
((List<Map.Entry<Long, ElasticsearchException>>) args[21])
((List<Map.Entry<Long, Tuple<Integer, ElasticsearchException>>>) args[21])
.stream()
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))),
(long) args[22]));

public static final String FETCH_EXCEPTIONS_ENTRY_PARSER_NAME = "shard-follow-node-task-status-fetch-exceptions-entry";

static final ConstructingObjectParser<Map.Entry<Long, ElasticsearchException>, Void> FETCH_EXCEPTIONS_ENTRY_PARSER =
static final ConstructingObjectParser<Map.Entry<Long, Tuple<Integer, ElasticsearchException>>, Void> FETCH_EXCEPTIONS_ENTRY_PARSER =
new ConstructingObjectParser<>(
FETCH_EXCEPTIONS_ENTRY_PARSER_NAME,
args -> new AbstractMap.SimpleEntry<>((long) args[0], (ElasticsearchException) args[1]));
args -> new AbstractMap.SimpleEntry<>((long) args[0], Tuple.tuple((Integer)args[1], (ElasticsearchException)args[2])));

static {
STATUS_PARSER.declareString(ConstructingObjectParser.constructorArg(), LEADER_INDEX);
Expand Down Expand Up @@ -123,10 +124,12 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
}

static final ParseField FETCH_EXCEPTIONS_ENTRY_FROM_SEQ_NO = new ParseField("from_seq_no");
static final ParseField FETCH_EXCEPTIONS_RETRIES = new ParseField("retries");
static final ParseField FETCH_EXCEPTIONS_ENTRY_EXCEPTION = new ParseField("exception");

static {
FETCH_EXCEPTIONS_ENTRY_PARSER.declareLong(ConstructingObjectParser.constructorArg(), FETCH_EXCEPTIONS_ENTRY_FROM_SEQ_NO);
FETCH_EXCEPTIONS_ENTRY_PARSER.declareInt(ConstructingObjectParser.constructorArg(), FETCH_EXCEPTIONS_RETRIES);
FETCH_EXCEPTIONS_ENTRY_PARSER.declareObject(
ConstructingObjectParser.constructorArg(),
(p, c) -> ElasticsearchException.fromXContent(p),
Expand Down Expand Up @@ -259,9 +262,9 @@ public long numberOfOperationsIndexed() {
return numberOfOperationsIndexed;
}

private final NavigableMap<Long, ElasticsearchException> fetchExceptions;
private final NavigableMap<Long, Tuple<Integer, ElasticsearchException>> fetchExceptions;

public NavigableMap<Long, ElasticsearchException> fetchExceptions() {
public NavigableMap<Long, Tuple<Integer, ElasticsearchException>> fetchExceptions() {
return fetchExceptions;
}

Expand Down Expand Up @@ -293,7 +296,7 @@ public ShardFollowNodeTaskStatus(
final long numberOfSuccessfulBulkOperations,
final long numberOfFailedBulkOperations,
final long numberOfOperationsIndexed,
final NavigableMap<Long, ElasticsearchException> fetchExceptions,
final NavigableMap<Long, Tuple<Integer, ElasticsearchException>> fetchExceptions,
final long timeSinceLastFetchMillis) {
this.leaderIndex = leaderIndex;
this.followerIndex = followerIndex;
Expand Down Expand Up @@ -342,7 +345,8 @@ public ShardFollowNodeTaskStatus(final StreamInput in) throws IOException {
this.numberOfSuccessfulBulkOperations = in.readVLong();
this.numberOfFailedBulkOperations = in.readVLong();
this.numberOfOperationsIndexed = in.readVLong();
this.fetchExceptions = new TreeMap<>(in.readMap(StreamInput::readVLong, StreamInput::readException));
this.fetchExceptions =
new TreeMap<>(in.readMap(StreamInput::readVLong, stream -> Tuple.tuple(stream.readVInt(), stream.readException())));
this.timeSinceLastFetchMillis = in.readZLong();
}

Expand Down Expand Up @@ -374,7 +378,10 @@ public void writeTo(final StreamOutput out) throws IOException {
out.writeVLong(numberOfSuccessfulBulkOperations);
out.writeVLong(numberOfFailedBulkOperations);
out.writeVLong(numberOfOperationsIndexed);
out.writeMap(fetchExceptions, StreamOutput::writeVLong, StreamOutput::writeException);
out.writeMap(
fetchExceptions,
StreamOutput::writeVLong,
(stream, value) -> { stream.writeVInt(value.v1()); stream.writeException(value.v2()); });
out.writeZLong(timeSinceLastFetchMillis);
}

Expand Down Expand Up @@ -421,14 +428,15 @@ public XContentBuilder toXContentFragment(final XContentBuilder builder, final P
builder.field(NUMBER_OF_OPERATIONS_INDEXED_FIELD.getPreferredName(), numberOfOperationsIndexed);
builder.startArray(FETCH_EXCEPTIONS.getPreferredName());
{
for (final Map.Entry<Long, ElasticsearchException> entry : fetchExceptions.entrySet()) {
for (final Map.Entry<Long, Tuple<Integer, ElasticsearchException>> entry : fetchExceptions.entrySet()) {
builder.startObject();
{
builder.field(FETCH_EXCEPTIONS_ENTRY_FROM_SEQ_NO.getPreferredName(), entry.getKey());
builder.field(FETCH_EXCEPTIONS_RETRIES.getPreferredName(), entry.getValue().v1());
builder.field(FETCH_EXCEPTIONS_ENTRY_EXCEPTION.getPreferredName());
builder.startObject();
{
ElasticsearchException.generateThrowableXContent(builder, params, entry.getValue());
ElasticsearchException.generateThrowableXContent(builder, params, entry.getValue().v2());
}
builder.endObject();
}
Expand Down Expand Up @@ -515,7 +523,7 @@ public int hashCode() {
}

private static List<String> getFetchExceptionMessages(final ShardFollowNodeTaskStatus status) {
return status.fetchExceptions().values().stream().map(ElasticsearchException::getMessage).collect(Collectors.toList());
return status.fetchExceptions().values().stream().map(t -> t.v2().getMessage()).collect(Collectors.toList());
}

public String toString() {
Expand Down
3 changes: 3 additions & 0 deletions x-pack/plugin/core/src/main/resources/monitoring-es.json
Original file line number Diff line number Diff line change
Expand Up @@ -987,6 +987,9 @@
"from_seq_no": {
"type": "long"
},
"retries": {
"type": "integer"
},
"exception": {
"type": "text"
}
Expand Down
Loading

0 comments on commit 2282150

Please sign in to comment.