Skip to content

Commit

Permalink
Prepare ShardFollowNodeTask to bootstrap when it fall behind leader s…
Browse files Browse the repository at this point in the history
…hard (#37562)

* Changed `LuceneSnapshot` to throw an `OperationsMissingException` if the requested ops are missing.
* Changed the shard changes api to handle the `OperationsMissingException` and wrap the exception into `ResourceNotFound` exception and include metadata to indicate the requested range can no longer be retrieved.
* Changed `ShardFollowNodeTask` to handle this `ResourceNotFound` exception with the included metdata header.

Relates to #35975
  • Loading branch information
martijnvg committed Jan 28, 2019
1 parent 14adf8a commit c8905da
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -147,13 +147,13 @@ public Translog.Operation next() throws IOException {
private void rangeCheck(Translog.Operation op) {
if (op == null) {
if (lastSeenSeqNo < toSeqNo) {
throw new IllegalStateException("Not all operations between from_seqno [" + fromSeqNo + "] " +
throw new MissingHistoryOperationsException("Not all operations between from_seqno [" + fromSeqNo + "] " +
"and to_seqno [" + toSeqNo + "] found; prematurely terminated last_seen_seqno [" + lastSeenSeqNo + "]");
}
} else {
final long expectedSeqNo = lastSeenSeqNo + 1;
if (op.seqNo() != expectedSeqNo) {
throw new IllegalStateException("Not all operations between from_seqno [" + fromSeqNo + "] " +
throw new MissingHistoryOperationsException("Not all operations between from_seqno [" + fromSeqNo + "] " +
"and to_seqno [" + toSeqNo + "] found; expected seqno [" + expectedSeqNo + "]; found [" + op + "]");
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.index.engine;

/**
* Exception indicating that not all requested operations from {@link LuceneChangesSnapshot}
* are available.
*/
public final class MissingHistoryOperationsException extends IllegalStateException {

MissingHistoryOperationsException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E
public static final String CCR_CUSTOM_METADATA_LEADER_INDEX_NAME_KEY = "leader_index_name";
public static final String CCR_CUSTOM_METADATA_REMOTE_CLUSTER_NAME_KEY = "remote_cluster_name";

public static final String REQUESTED_OPS_MISSING_METADATA_KEY = "es.requested_operations_missing";

private final boolean enabled;
private final Settings settings;
private final CcrLicenseChecker ccrLicenseChecker;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@
package org.elasticsearch.xpack.ccr.action;

import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException;
Expand All @@ -30,16 +29,17 @@
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.MissingHistoryOperationsException;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardNotStartedException;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ccr.Ccr;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -408,21 +408,6 @@ protected void asyncShardOperation(
}
}

@Override
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
ActionListener<Response> wrappedListener = ActionListener.wrap(listener::onResponse, e -> {
Throwable cause = ExceptionsHelper.unwrapCause(e);
if (cause instanceof IllegalStateException && cause.getMessage().contains("Not all operations between from_seqno [")) {
String message = "Operations are no longer available for replicating. Maybe increase the retention setting [" +
IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey() + "]?";
listener.onFailure(new ElasticsearchException(message, e));
} else {
listener.onFailure(e);
}
});
super.doExecute(task, request, wrappedListener);
}

private void globalCheckpointAdvanced(
final ShardId shardId,
final long globalCheckpoint,
Expand Down Expand Up @@ -541,6 +526,14 @@ static Translog.Operation[] getOperations(
break;
}
}
} catch (MissingHistoryOperationsException e) {
String message = "Operations are no longer available for replicating. Maybe increase the retention setting [" +
IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey() + "]?";
// Make it easy to detect this error in ShardFollowNodeTask:
// (adding a metadata header instead of introducing a new exception that extends ElasticsearchException)
ResourceNotFoundException wrapper = new ResourceNotFoundException(message, e);
wrapper.addMetadata(Ccr.REQUESTED_OPS_MISSING_METADATA_KEY, Long.toString(fromSeqNo), Long.toString(toSeqNo));
throw wrapper;
}
return operations.toArray(EMPTY_OPERATIONS_ARRAY);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchSecurityException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.NoShardAvailableActionException;
import org.elasticsearch.action.UnavailableShardsException;
import org.elasticsearch.cluster.block.ClusterBlockException;
Expand All @@ -30,6 +31,7 @@
import org.elasticsearch.persistent.AllocatedPersistentTask;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.xpack.ccr.Ccr;
import org.elasticsearch.transport.NoSuchRemoteClusterException;
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse;
import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus;
Expand Down Expand Up @@ -275,6 +277,14 @@ private void sendShardChangesRequest(long from, int maxOperationCount, long maxR
failedReadRequests++;
fetchExceptions.put(from, Tuple.tuple(retryCounter, ExceptionsHelper.convertToElastic(e)));
}
Throwable cause = ExceptionsHelper.unwrapCause(e);
if (cause instanceof ResourceNotFoundException) {
ResourceNotFoundException resourceNotFoundException = (ResourceNotFoundException) cause;
if (resourceNotFoundException.getMetadataKeys().contains(Ccr.REQUESTED_OPS_MISSING_METADATA_KEY)) {
handleFallenBehindLeaderShard(e, from, maxOperationCount, maxRequiredSeqNo, retryCounter);
return;
}
}
handleFailure(e, retryCounter, () -> sendShardChangesRequest(from, maxOperationCount, maxRequiredSeqNo, retryCounter));
});
}
Expand All @@ -291,6 +301,18 @@ void handleReadResponse(long from, long maxRequiredSeqNo, ShardChangesAction.Res
maybeUpdateSettings(response.getSettingsVersion(), updateMappingsTask);
}

void handleFallenBehindLeaderShard(Exception e, long from, int maxOperationCount, long maxRequiredSeqNo, AtomicInteger retryCounter) {
// Do restore from repository here and after that
// start() should be invoked and stats should be reset

// For now handle like any other failure:
// need a more robust approach to avoid the scenario where an outstanding request
// can trigger another restore while the shard was restored already.
// https://github.com/elastic/elasticsearch/pull/37562#discussion_r250009367

handleFailure(e, retryCounter, () -> sendShardChangesRequest(from, maxOperationCount, maxRequiredSeqNo, retryCounter));
}

/** Called when some operations are fetched from the leading */
protected void onOperationsFetched(Translog.Operation[] operations) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
package org.elasticsearch.xpack.ccr.action;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.LatchedActionListener;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
Expand All @@ -16,12 +19,17 @@
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.xpack.ccr.Ccr;
import org.elasticsearch.xpack.ccr.LocalStateCcr;

import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;

import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;

public class ShardChangesTests extends ESSingleNodeTestCase {

Expand Down Expand Up @@ -88,7 +96,7 @@ public void testGetOperationsBasedOnGlobalSequenceId() throws Exception {
assertThat(operation.id(), equalTo("5"));
}

public void testMissingOperations() {
public void testMissingOperations() throws Exception {
client().admin().indices().prepareCreate("index")
.setSettings(Settings.builder()
.put("index.soft_deletes.enabled", true)
Expand All @@ -113,9 +121,34 @@ public void testMissingOperations() {
request.setFromSeqNo(0L);
request.setMaxOperationCount(1);

Exception e = expectThrows(ElasticsearchException.class, () -> client().execute(ShardChangesAction.INSTANCE, request).actionGet());
assertThat(e.getMessage(), equalTo("Operations are no longer available for replicating. Maybe increase the retention setting " +
"[index.soft_deletes.retention.operations]?"));
{
ResourceNotFoundException e =
expectThrows(ResourceNotFoundException.class, () -> client().execute(ShardChangesAction.INSTANCE, request).actionGet());
assertThat(e.getMessage(), equalTo("Operations are no longer available for replicating. Maybe increase the retention setting " +
"[index.soft_deletes.retention.operations]?"));

assertThat(e.getMetadataKeys().size(), equalTo(1));
assertThat(e.getMetadata(Ccr.REQUESTED_OPS_MISSING_METADATA_KEY), notNullValue());
assertThat(e.getMetadata(Ccr.REQUESTED_OPS_MISSING_METADATA_KEY), contains("0", "0"));
}
{
AtomicReference<Exception> holder = new AtomicReference<>();
CountDownLatch latch = new CountDownLatch(1);
client().execute(ShardChangesAction.INSTANCE, request,
new LatchedActionListener<>(ActionListener.wrap(r -> fail("expected an exception"), holder::set), latch));
latch.await();

ElasticsearchException e = (ElasticsearchException) holder.get();
assertThat(e, notNullValue());
assertThat(e.getMetadataKeys().size(), equalTo(0));

ResourceNotFoundException cause = (ResourceNotFoundException) e.getCause();
assertThat(cause.getMessage(), equalTo("Operations are no longer available for replicating. " +
"Maybe increase the retention setting [index.soft_deletes.retention.operations]?"));
assertThat(cause.getMetadataKeys().size(), equalTo(1));
assertThat(cause.getMetadata(Ccr.REQUESTED_OPS_MISSING_METADATA_KEY), notNullValue());
assertThat(cause.getMetadata(Ccr.REQUESTED_OPS_MISSING_METADATA_KEY), contains("0", "0"));
}
}

}

0 comments on commit c8905da

Please sign in to comment.