diff --git a/server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java b/server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java index ecb2a05c68438..d16b850a0a27d 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java +++ b/server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java @@ -147,13 +147,13 @@ public Translog.Operation next() throws IOException { private void rangeCheck(Translog.Operation op) { if (op == null) { if (lastSeenSeqNo < toSeqNo) { - throw new IllegalStateException("Not all operations between from_seqno [" + fromSeqNo + "] " + + throw new MissingHistoryOperationsException("Not all operations between from_seqno [" + fromSeqNo + "] " + "and to_seqno [" + toSeqNo + "] found; prematurely terminated last_seen_seqno [" + lastSeenSeqNo + "]"); } } else { final long expectedSeqNo = lastSeenSeqNo + 1; if (op.seqNo() != expectedSeqNo) { - throw new IllegalStateException("Not all operations between from_seqno [" + fromSeqNo + "] " + + throw new MissingHistoryOperationsException("Not all operations between from_seqno [" + fromSeqNo + "] " + "and to_seqno [" + toSeqNo + "] found; expected seqno [" + expectedSeqNo + "]; found [" + op + "]"); } } diff --git a/server/src/main/java/org/elasticsearch/index/engine/MissingHistoryOperationsException.java b/server/src/main/java/org/elasticsearch/index/engine/MissingHistoryOperationsException.java new file mode 100644 index 0000000000000..8f2fa1e5b7375 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/engine/MissingHistoryOperationsException.java @@ -0,0 +1,31 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.engine; + +/** + * Exception indicating that not all requested operations from {@link LuceneChangesSnapshot} + * are available. + */ +public final class MissingHistoryOperationsException extends IllegalStateException { + + MissingHistoryOperationsException(String message) { + super(message); + } +} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java index 4240693f928c3..c98114bcdc9fd 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java @@ -118,6 +118,8 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E public static final String CCR_CUSTOM_METADATA_LEADER_INDEX_NAME_KEY = "leader_index_name"; public static final String CCR_CUSTOM_METADATA_REMOTE_CLUSTER_NAME_KEY = "remote_cluster_name"; + public static final String REQUESTED_OPS_MISSING_METADATA_KEY = "es.requested_operations_missing"; + private final boolean enabled; private final Settings settings; private final CcrLicenseChecker ccrLicenseChecker; diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java index d1c5c6a44c4b7..c56cce16673b1 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java @@ -6,8 +6,7 @@ package org.elasticsearch.xpack.ccr.action; import org.apache.logging.log4j.message.ParameterizedMessage; -import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.Action; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequestValidationException; @@ -30,6 +29,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.engine.MissingHistoryOperationsException; import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardNotStartedException; @@ -37,9 +37,9 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.IndicesService; -import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.ccr.Ccr; import java.io.IOException; import java.util.ArrayList; @@ -408,21 +408,6 @@ protected void asyncShardOperation( } } - @Override - protected void doExecute(Task task, Request request, ActionListener listener) { - ActionListener wrappedListener = ActionListener.wrap(listener::onResponse, e -> { - Throwable cause = ExceptionsHelper.unwrapCause(e); - if (cause instanceof IllegalStateException && cause.getMessage().contains("Not all operations between from_seqno [")) { - String message = "Operations are no longer available for replicating. Maybe increase the retention setting [" + - IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey() + "]?"; - listener.onFailure(new ElasticsearchException(message, e)); - } else { - listener.onFailure(e); - } - }); - super.doExecute(task, request, wrappedListener); - } - private void globalCheckpointAdvanced( final ShardId shardId, final long globalCheckpoint, @@ -541,6 +526,14 @@ static Translog.Operation[] getOperations( break; } } + } catch (MissingHistoryOperationsException e) { + String message = "Operations are no longer available for replicating. Maybe increase the retention setting [" + + IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey() + "]?"; + // Make it easy to detect this error in ShardFollowNodeTask: + // (adding a metadata header instead of introducing a new exception that extends ElasticsearchException) + ResourceNotFoundException wrapper = new ResourceNotFoundException(message, e); + wrapper.addMetadata(Ccr.REQUESTED_OPS_MISSING_METADATA_KEY, Long.toString(fromSeqNo), Long.toString(toSeqNo)); + throw wrapper; } return operations.toArray(EMPTY_OPERATIONS_ARRAY); } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java index 7d8e1fa884757..233085c0a6857 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java @@ -13,6 +13,7 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchSecurityException; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.NoShardAvailableActionException; import org.elasticsearch.action.UnavailableShardsException; import org.elasticsearch.cluster.block.ClusterBlockException; @@ -30,6 +31,7 @@ import org.elasticsearch.persistent.AllocatedPersistentTask; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.transport.ConnectTransportException; +import org.elasticsearch.xpack.ccr.Ccr; import org.elasticsearch.transport.NoSuchRemoteClusterException; import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse; import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus; @@ -275,6 +277,14 @@ private void sendShardChangesRequest(long from, int maxOperationCount, long maxR failedReadRequests++; fetchExceptions.put(from, Tuple.tuple(retryCounter, ExceptionsHelper.convertToElastic(e))); } + Throwable cause = ExceptionsHelper.unwrapCause(e); + if (cause instanceof ResourceNotFoundException) { + ResourceNotFoundException resourceNotFoundException = (ResourceNotFoundException) cause; + if (resourceNotFoundException.getMetadataKeys().contains(Ccr.REQUESTED_OPS_MISSING_METADATA_KEY)) { + handleFallenBehindLeaderShard(e, from, maxOperationCount, maxRequiredSeqNo, retryCounter); + return; + } + } handleFailure(e, retryCounter, () -> sendShardChangesRequest(from, maxOperationCount, maxRequiredSeqNo, retryCounter)); }); } @@ -291,6 +301,18 @@ void handleReadResponse(long from, long maxRequiredSeqNo, ShardChangesAction.Res maybeUpdateSettings(response.getSettingsVersion(), updateMappingsTask); } + void handleFallenBehindLeaderShard(Exception e, long from, int maxOperationCount, long maxRequiredSeqNo, AtomicInteger retryCounter) { + // Do restore from repository here and after that + // start() should be invoked and stats should be reset + + // For now handle like any other failure: + // need a more robust approach to avoid the scenario where an outstanding request + // can trigger another restore while the shard was restored already. + // https://github.com/elastic/elasticsearch/pull/37562#discussion_r250009367 + + handleFailure(e, retryCounter, () -> sendShardChangesRequest(from, maxOperationCount, maxRequiredSeqNo, retryCounter)); + } + /** Called when some operations are fetched from the leading */ protected void onOperationsFetched(Translog.Operation[] operations) { diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesTests.java index 6a2a4baab1738..f42a50b91ff02 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesTests.java @@ -6,6 +6,9 @@ package org.elasticsearch.xpack.ccr.action; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.LatchedActionListener; import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; @@ -16,12 +19,17 @@ import org.elasticsearch.index.translog.Translog; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESSingleNodeTestCase; +import org.elasticsearch.xpack.ccr.Ccr; import org.elasticsearch.xpack.ccr.LocalStateCcr; import java.util.Collection; import java.util.Collections; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; +import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; public class ShardChangesTests extends ESSingleNodeTestCase { @@ -88,7 +96,7 @@ public void testGetOperationsBasedOnGlobalSequenceId() throws Exception { assertThat(operation.id(), equalTo("5")); } - public void testMissingOperations() { + public void testMissingOperations() throws Exception { client().admin().indices().prepareCreate("index") .setSettings(Settings.builder() .put("index.soft_deletes.enabled", true) @@ -113,9 +121,34 @@ public void testMissingOperations() { request.setFromSeqNo(0L); request.setMaxOperationCount(1); - Exception e = expectThrows(ElasticsearchException.class, () -> client().execute(ShardChangesAction.INSTANCE, request).actionGet()); - assertThat(e.getMessage(), equalTo("Operations are no longer available for replicating. Maybe increase the retention setting " + - "[index.soft_deletes.retention.operations]?")); + { + ResourceNotFoundException e = + expectThrows(ResourceNotFoundException.class, () -> client().execute(ShardChangesAction.INSTANCE, request).actionGet()); + assertThat(e.getMessage(), equalTo("Operations are no longer available for replicating. Maybe increase the retention setting " + + "[index.soft_deletes.retention.operations]?")); + + assertThat(e.getMetadataKeys().size(), equalTo(1)); + assertThat(e.getMetadata(Ccr.REQUESTED_OPS_MISSING_METADATA_KEY), notNullValue()); + assertThat(e.getMetadata(Ccr.REQUESTED_OPS_MISSING_METADATA_KEY), contains("0", "0")); + } + { + AtomicReference holder = new AtomicReference<>(); + CountDownLatch latch = new CountDownLatch(1); + client().execute(ShardChangesAction.INSTANCE, request, + new LatchedActionListener<>(ActionListener.wrap(r -> fail("expected an exception"), holder::set), latch)); + latch.await(); + + ElasticsearchException e = (ElasticsearchException) holder.get(); + assertThat(e, notNullValue()); + assertThat(e.getMetadataKeys().size(), equalTo(0)); + + ResourceNotFoundException cause = (ResourceNotFoundException) e.getCause(); + assertThat(cause.getMessage(), equalTo("Operations are no longer available for replicating. " + + "Maybe increase the retention setting [index.soft_deletes.retention.operations]?")); + assertThat(cause.getMetadataKeys().size(), equalTo(1)); + assertThat(cause.getMetadata(Ccr.REQUESTED_OPS_MISSING_METADATA_KEY), notNullValue()); + assertThat(cause.getMetadata(Ccr.REQUESTED_OPS_MISSING_METADATA_KEY), contains("0", "0")); + } } }