From 6c297ad7c8cc85a0d30b7a38f1eecd835a650c70 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 15 Jan 2018 18:13:47 -0500 Subject: [PATCH 1/8] TEST: Update logging for testAckedIndexing - Log the response of indexing requests - Correct logging setting for discovery package --- .../org/elasticsearch/discovery/ClusterDisruptionIT.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/discovery/ClusterDisruptionIT.java b/server/src/test/java/org/elasticsearch/discovery/ClusterDisruptionIT.java index 8d21c6306382b..55f5b70e70299 100644 --- a/server/src/test/java/org/elasticsearch/discovery/ClusterDisruptionIT.java +++ b/server/src/test/java/org/elasticsearch/discovery/ClusterDisruptionIT.java @@ -81,7 +81,8 @@ public class ClusterDisruptionIT extends AbstractDisruptionTestCase { *

* This test is a superset of tests run in the Jepsen test suite, with the exception of versioned updates */ - @TestLogging("_root:DEBUG,org.elasticsearch.action.bulk:TRACE,org.elasticsearch.action.get:TRACE,discovery:TRACE," + + @TestLogging("_root:DEBUG,org.elasticsearch.action.bulk:TRACE,org.elasticsearch.action.get:TRACE," + + "org.elasticsearch.discovery:TRACE,org.elasticsearch.action.support.replication:TRACE," + "org.elasticsearch.cluster.service:TRACE,org.elasticsearch.indices.recovery:TRACE," + "org.elasticsearch.indices.cluster:TRACE,org.elasticsearch.index.shard:TRACE") public void testAckedIndexing() throws Exception { @@ -137,7 +138,7 @@ public void testAckedIndexing() throws Exception { .get(timeout); assertEquals(DocWriteResponse.Result.CREATED, response.getResult()); ackedDocs.put(id, node); - logger.trace("[{}] indexed id [{}] through node [{}]", name, id, node); + logger.trace("[{}] indexed id [{}] through node [{}], response [{}]", name, id, node, response); } catch (ElasticsearchException e) { exceptedExceptions.add(e); final String docId = id; From 71ba314c733fe5f2a175e2b8e8d871d61e3e3202 Mon Sep 17 00:00:00 2001 From: fbsolo Date: Tue, 16 Jan 2018 00:35:35 -0800 Subject: [PATCH 2/8] [Docs] Changes to ingest.asciidoc (#28212) --- docs/reference/ingest.asciidoc | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/docs/reference/ingest.asciidoc b/docs/reference/ingest.asciidoc index da1164930bc1e..18349beab6ab1 100644 --- a/docs/reference/ingest.asciidoc +++ b/docs/reference/ingest.asciidoc @@ -3,26 +3,27 @@ [partintro] -- -You can use ingest node to pre-process documents before the actual indexing takes place. -This pre-processing happens by an ingest node that intercepts bulk and index requests, applies the -transformations, and then passes the documents back to the index or bulk APIs. +Use an ingest node to pre-process documents before the actual document indexing happens. +The ingest node intercepts bulk and index requests, it applies transformations, and it then +passes the documents back to the index or bulk APIs. -You can enable ingest on any node or even have dedicated ingest nodes. Ingest is enabled by default -on all nodes. To disable ingest on a node, configure the following setting in the `elasticsearch.yml` file: +All nodes enable ingest by default, so any node can handle ingest tasks. You can also create +dedicated ingest nodes. To disable ingest for a node, configure the following setting in the +elasticsearch.yml file: [source,yaml] -------------------------------------------------- node.ingest: false -------------------------------------------------- -To pre-process documents before indexing, you <> that specifies -a series of <>. Each processor transforms the document in some way. -For example, you may have a pipeline that consists of one processor that removes a field from -the document followed by another processor that renames a field. Configured pipelines are then stored -in the <>. +To pre-process documents before indexing, <> that specifies a series of +<>. Each processor transforms the document in some specific way. For example, a +pipeline might have one processor that removes a field from the document, followed by +another processor that renames a field. The <> then stores +the configured pipelines. -To use a pipeline, you simply specify the `pipeline` parameter on an index or bulk request to -tell the ingest node which pipeline to use. For example: +To use a pipeline, simply specify the `pipeline` parameter on an index or bulk request. This +way, the ingest node knows which pipeline to use. For example: [source,js] -------------------------------------------------- From 0c4e2cbc19a9dad671b135d8f473943119677409 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Tue, 16 Jan 2018 09:50:06 +0100 Subject: [PATCH 3/8] Fallback to TransportMasterNodeAction for cluster health retries (#28195) ClusterHealthAction does not use the regular retry logic, possibly causing StackOverflowErrors. Relates #28169 --- .../admin/cluster/health/TransportClusterHealthAction.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/health/TransportClusterHealthAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/health/TransportClusterHealthAction.java index f4c7748d43924..541738d6be7cc 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/health/TransportClusterHealthAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/health/TransportClusterHealthAction.java @@ -30,6 +30,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateObserver; import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.NotMasterException; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; @@ -125,7 +126,8 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS @Override public void onNoLongerMaster(String source) { logger.trace("stopped being master while waiting for events with priority [{}]. retrying.", request.waitForEvents()); - doExecute(task, request, listener); + // TransportMasterNodeAction implements the retry logic, which is triggered by passing a NotMasterException + listener.onFailure(new NotMasterException("no longer master. source: [" + source + "]")); } @Override From 196c7b80dc2e8bebd9d9023be13639a2078f3d15 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Tue, 16 Jan 2018 09:58:58 +0100 Subject: [PATCH 4/8] Never return null from Strings.tokenizeToStringArray (#28224) This method has a different contract than all the other methods in this class, returning null instead of an empty array when receiving a null input. While switching over some methods from delimitedListToStringArray to this method tokenizeToStringArray, this resulted in unexpected nulls in some places of our code. Relates #28213 --- .../src/main/java/org/elasticsearch/common/Strings.java | 5 ++++- .../allocation/decider/FilterAllocationDeciderTests.java | 8 ++++++++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/common/Strings.java b/server/src/main/java/org/elasticsearch/common/Strings.java index 6c2fc4e1ec153..02a0852b0a03a 100644 --- a/server/src/main/java/org/elasticsearch/common/Strings.java +++ b/server/src/main/java/org/elasticsearch/common/Strings.java @@ -474,6 +474,9 @@ public static String[] split(String toSplit, String delimiter) { * @see #delimitedListToStringArray */ public static String[] tokenizeToStringArray(final String s, final String delimiters) { + if (s == null) { + return EMPTY_ARRAY; + } return toStringArray(tokenizeToCollection(s, delimiters, ArrayList::new)); } @@ -536,7 +539,7 @@ public static String[] delimitedListToStringArray(String str, String delimiter) */ public static String[] delimitedListToStringArray(String str, String delimiter, String charsToDelete) { if (str == null) { - return new String[0]; + return EMPTY_ARRAY; } if (delimiter == null) { return new String[]{str}; diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/FilterAllocationDeciderTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/FilterAllocationDeciderTests.java index c4105771229bc..8381f2f960b75 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/FilterAllocationDeciderTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/FilterAllocationDeciderTests.java @@ -194,6 +194,14 @@ public void testInvalidIPFilter() { assertEquals("invalid IP address [" + invalidIP + "] for [" + filterSetting.getKey() + ipKey + "]", e.getMessage()); } + public void testNull() { + Setting filterSetting = randomFrom(IndexMetaData.INDEX_ROUTING_REQUIRE_GROUP_SETTING, + IndexMetaData.INDEX_ROUTING_INCLUDE_GROUP_SETTING, IndexMetaData.INDEX_ROUTING_EXCLUDE_GROUP_SETTING); + + IndexMetaData.builder("test") + .settings(settings(Version.CURRENT).putNull(filterSetting.getKey() + "name")).numberOfShards(2).numberOfReplicas(0).build(); + } + public void testWildcardIPFilter() { String ipKey = randomFrom("_ip", "_host_ip", "_publish_ip"); Setting filterSetting = randomFrom(IndexMetaData.INDEX_ROUTING_REQUIRE_GROUP_SETTING, From efe2e521180f989218898867c5509d860fc46312 Mon Sep 17 00:00:00 2001 From: Adrien Grand Date: Tue, 16 Jan 2018 10:50:07 +0100 Subject: [PATCH 5/8] Fix eclipse build. (#28236) Relates #28191 --- libs/elasticsearch-core/src/main/eclipse-build.gradle | 2 ++ libs/elasticsearch-core/src/test/eclipse-build.gradle | 6 ++++++ settings.gradle | 5 +++++ 3 files changed, 13 insertions(+) create mode 100644 libs/elasticsearch-core/src/main/eclipse-build.gradle create mode 100644 libs/elasticsearch-core/src/test/eclipse-build.gradle diff --git a/libs/elasticsearch-core/src/main/eclipse-build.gradle b/libs/elasticsearch-core/src/main/eclipse-build.gradle new file mode 100644 index 0000000000000..9c84a4d6bd84b --- /dev/null +++ b/libs/elasticsearch-core/src/main/eclipse-build.gradle @@ -0,0 +1,2 @@ +// this is just shell gradle file for eclipse to have separate projects for elasticsearch-core src and tests +apply from: '../../build.gradle' diff --git a/libs/elasticsearch-core/src/test/eclipse-build.gradle b/libs/elasticsearch-core/src/test/eclipse-build.gradle new file mode 100644 index 0000000000000..f43f019941bb2 --- /dev/null +++ b/libs/elasticsearch-core/src/test/eclipse-build.gradle @@ -0,0 +1,6 @@ +// this is just shell gradle file for eclipse to have separate projects for elasticsearch-core src and tests +apply from: '../../build.gradle' + +dependencies { + testCompile project(':libs:elasticsearch-core') +} diff --git a/settings.gradle b/settings.gradle index b844af52df76b..46ecb3dad1c97 100644 --- a/settings.gradle +++ b/settings.gradle @@ -110,6 +110,7 @@ if (isEclipse) { // eclipse cannot handle an intermediate dependency between main and test, so we must create separate projects // for server-src and server-tests projects << 'server-tests' + projects << 'libs:elasticsearch-core-tests' projects << 'libs:elasticsearch-nio-tests' } @@ -128,6 +129,10 @@ if (isEclipse) { project(":server").buildFileName = 'eclipse-build.gradle' project(":server-tests").projectDir = new File(rootProject.projectDir, 'server/src/test') project(":server-tests").buildFileName = 'eclipse-build.gradle' + project(":libs:elasticsearch-core").projectDir = new File(rootProject.projectDir, 'libs/elasticsearch-core/src/main') + project(":libs:elasticsearch-core").buildFileName = 'eclipse-build.gradle' + project(":libs:elasticsearch-core-tests").projectDir = new File(rootProject.projectDir, 'libs/elasticsearch-core/src/test') + project(":libs:elasticsearch-core-tests").buildFileName = 'eclipse-build.gradle' project(":libs:elasticsearch-nio").projectDir = new File(rootProject.projectDir, 'libs/elasticsearch-nio/src/main') project(":libs:elasticsearch-nio").buildFileName = 'eclipse-build.gradle' project(":libs:elasticsearch-nio-tests").projectDir = new File(rootProject.projectDir, 'libs/elasticsearch-nio/src/test') From 67c1f1c856cad9624087931e7ca1285e16cd55f7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20B=C3=BCscher?= Date: Tue, 16 Jan 2018 12:05:03 +0100 Subject: [PATCH 6/8] [Docs] Fix Java Api index administration usage (#28133) The Java API documentation for index administration currenty is wrong because the PutMappingRequestBuilder#setSource(Object... source) and CreateIndexRequestBuilder#addMapping(String type, Object... source) methods delegate to methods that check that the input arguments are valid key/value pairs: https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/java-admin-indices.html This changes the docs so the java api code examples are included from documentation integration tests so we detect compile and runtime issues earlier. Closes #28131 --- .../admin/indices/put-mapping.asciidoc | 57 +++------------- .../admin/indices/create/CreateIndexIT.java | 68 +++++++++++++++++++ 2 files changed, 77 insertions(+), 48 deletions(-) diff --git a/docs/java-api/admin/indices/put-mapping.asciidoc b/docs/java-api/admin/indices/put-mapping.asciidoc index e52c66d96c3bb..97cfcf589b9d8 100644 --- a/docs/java-api/admin/indices/put-mapping.asciidoc +++ b/docs/java-api/admin/indices/put-mapping.asciidoc @@ -1,21 +1,13 @@ [[java-admin-indices-put-mapping]] +:base-dir: {docdir}/../../core/src/test/java/org/elasticsearch/action/admin/indices/create + ==== Put Mapping The PUT mapping API allows you to add a new type while creating an index: -[source,java] +["source","java",subs="attributes,callouts,macros"] -------------------------------------------------- -client.admin().indices().prepareCreate("twitter") <1> - .addMapping("tweet", "{\n" + <2> - " \"tweet\": {\n" + - " \"properties\": {\n" + - " \"message\": {\n" + - " \"type\": \"text\"\n" + - " }\n" + - " }\n" + - " }\n" + - " }") - .get(); +include-tagged::{base-dir}/CreateIndexIT.java[addMapping-create-index-request] -------------------------------------------------- <1> <> called `twitter` <2> It also adds a `tweet` mapping type. @@ -23,32 +15,9 @@ client.admin().indices().prepareCreate("twitter") <1> The PUT mapping API also allows to add a new type to an existing index: -[source,java] +["source","java",subs="attributes,callouts,macros"] -------------------------------------------------- -client.admin().indices().preparePutMapping("twitter") <1> - .setType("user") <2> - .setSource("{\n" + <3> - " \"properties\": {\n" + - " \"name\": {\n" + - " \"type\": \"text\"\n" + - " }\n" + - " }\n" + - "}") - .get(); - -// You can also provide the type in the source document -client.admin().indices().preparePutMapping("twitter") - .setType("user") - .setSource("{\n" + - " \"user\":{\n" + <4> - " \"properties\": {\n" + - " \"name\": {\n" + - " \"type\": \"text\"\n" + - " }\n" + - " }\n" + - " }\n" + - "}") - .get(); +include-tagged::{base-dir}/CreateIndexIT.java[putMapping-request-source] -------------------------------------------------- <1> Puts a mapping on existing index called `twitter` <2> Adds a `user` mapping type. @@ -57,20 +26,12 @@ client.admin().indices().preparePutMapping("twitter") You can use the same API to update an existing mapping: -[source,java] +["source","java",subs="attributes,callouts,macros"] -------------------------------------------------- -client.admin().indices().preparePutMapping("twitter") <1> - .setType("user") <2> - .setSource("{\n" + <3> - " \"properties\": {\n" + - " \"user_name\": {\n" + - " \"type\": \"text\"\n" + - " }\n" + - " }\n" + - "}") - .get(); +include-tagged::{base-dir}/CreateIndexIT.java[putMapping-request-source-append] -------------------------------------------------- <1> Puts a mapping on existing index called `twitter` <2> Updates the `user` mapping type. <3> This `user` has now a new field `user_name` +:base-dir!: \ No newline at end of file diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/create/CreateIndexIT.java b/server/src/test/java/org/elasticsearch/action/admin/indices/create/CreateIndexIT.java index 14d6647071453..2ebb84ef92a72 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/create/CreateIndexIT.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/create/CreateIndexIT.java @@ -20,6 +20,7 @@ package org.elasticsearch.action.admin.indices.create; import com.carrotsearch.hppc.cursors.ObjectCursor; + import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.UnavailableShardsException; @@ -28,6 +29,7 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; @@ -35,6 +37,7 @@ import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.query.RangeQueryBuilder; @@ -400,4 +403,69 @@ public Settings onNodeStopped(String nodeName) throws Exception { assertThat(e, hasToString(containsString("unknown setting [index.foo]"))); } + /** + * This test method is used to generate the Put Mapping Java Indices API documentation + * at "docs/java-api/admin/indices/put-mapping.asciidoc" so the documentation gets tested + * so that it compiles and runs without throwing errors at runtime. + */ + public void testPutMappingDocumentation() throws Exception { + Client client = client(); + // tag::addMapping-create-index-request + client.admin().indices().prepareCreate("twitter") // <1> + .addMapping("tweet", "{\n" + // <2> + " \"tweet\": {\n" + + " \"properties\": {\n" + + " \"message\": {\n" + + " \"type\": \"text\"\n" + + " }\n" + + " }\n" + + " }\n" + + " }", XContentType.JSON) + .get(); + // end::addMapping-create-index-request + + // we need to delete in order to create a fresh new index with another type + client.admin().indices().prepareDelete("twitter").get(); + client.admin().indices().prepareCreate("twitter").get(); + + // tag::putMapping-request-source + client.admin().indices().preparePutMapping("twitter") // <1> + .setType("user") // <2> + .setSource("{\n" + // <3> + " \"properties\": {\n" + + " \"name\": {\n" + + " \"type\": \"text\"\n" + + " }\n" + + " }\n" + + "}", XContentType.JSON) + .get(); + + // You can also provide the type in the source document + client.admin().indices().preparePutMapping("twitter") + .setType("user") + .setSource("{\n" + + " \"user\":{\n" + // <4> + " \"properties\": {\n" + + " \"name\": {\n" + + " \"type\": \"text\"\n" + + " }\n" + + " }\n" + + " }\n" + + "}", XContentType.JSON) + .get(); + // end::putMapping-request-source + + // tag::putMapping-request-source-append + client.admin().indices().preparePutMapping("twitter") // <1> + .setType("user") // <2> + .setSource("{\n" + // <3> + " \"properties\": {\n" + + " \"user_name\": {\n" + + " \"type\": \"text\"\n" + + " }\n" + + " }\n" + + "}", XContentType.JSON) + .get(); + // end::putMapping-request-source-append + } } From 65e90079adb638f33be5ffb3f387169f15eb7f2b Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 16 Jan 2018 08:37:42 -0500 Subject: [PATCH 7/8] Open engine should keep only starting commit (#28228) Keeping unsafe commits when opening an engine can be problematic because these commits are not safe at the recovering time but they can suddenly become safe in the future. The following issues can happen if unsafe commits are kept oninit. 1. Replica can use unsafe commit in peer-recovery. This happens when a replica with a safe commit c1 (max_seqno=1) and an unsafe commit c2 (max_seqno=2) recovers from a primary with c1(max_seqno=1). If a new document (seqno=2) is added without flushing, the global checkpoint is advanced to 2; and the replica recovers again, it will use the unsafe commit c2 (max_seqno=2 <= gcp=2) as the starting commit for sequenced based recovery even the commit c2 contains a stale operation and the document (with seqno=2) will not be replicated to the replica. 2. Min translog gen for recovery can go backwards in peer-recovery. This happens when a replica with a safe commit c1 (local_checkpoint=1, recovery_translog_gen=1) and an unsafe commit c2 (local_checkpoint=2, recovery_translog_gen=2). The replica recovers from a primary, and keeps c2 as the last commit, then sets last_translog_gen to 2. Flushing a new commit on the replica will cause exception as the new last commit c3 will have recovery_translog_gen=1. The recovery translog generation of a commit is calculated based on the current local checkpoint. The local checkpoint of c3 is 1 while the local checkpoint of c2 is 2. 3. Commit without translog can be used for recovery. An old index, which was created before multiple-commits is introduced (v6.2), may not have a safe commit. If that index has a snapshotted commit without translog and an unsafe commit, the policy can consider the snapshotted commit as a safe commit for recovery even the commit does not have translog. These issues can be avoided if the combined deletion policy keeps only the starting commit onInit. Relates #27804 Relates #28181 --- .../index/engine/CombinedDeletionPolicy.java | 49 ++++++++++++--- .../index/engine/InternalEngine.java | 60 +++++++++++------- .../elasticsearch/index/shard/IndexShard.java | 15 +++-- .../org/elasticsearch/index/store/Store.java | 10 +-- .../engine/CombinedDeletionPolicyTests.java | 43 +++++++++++-- .../index/engine/InternalEngineTests.java | 63 ++++++++++++++++++- .../RecoveryDuringReplicationTests.java | 46 +++++++++++++- 7 files changed, 234 insertions(+), 52 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java index e5d8cacf73657..ca0d93fa7c5aa 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java +++ b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java @@ -45,37 +45,72 @@ public final class CombinedDeletionPolicy extends IndexDeletionPolicy { private final TranslogDeletionPolicy translogDeletionPolicy; private final EngineConfig.OpenMode openMode; private final LongSupplier globalCheckpointSupplier; + private final IndexCommit startingCommit; private final ObjectIntHashMap snapshottedCommits; // Number of snapshots held against each commit point. private IndexCommit safeCommit; // the most recent safe commit point - its max_seqno at most the persisted global checkpoint. private IndexCommit lastCommit; // the most recent commit point CombinedDeletionPolicy(EngineConfig.OpenMode openMode, TranslogDeletionPolicy translogDeletionPolicy, - LongSupplier globalCheckpointSupplier) { + LongSupplier globalCheckpointSupplier, IndexCommit startingCommit) { this.openMode = openMode; this.translogDeletionPolicy = translogDeletionPolicy; this.globalCheckpointSupplier = globalCheckpointSupplier; + this.startingCommit = startingCommit; this.snapshottedCommits = new ObjectIntHashMap<>(); } @Override - public void onInit(List commits) throws IOException { + public synchronized void onInit(List commits) throws IOException { switch (openMode) { case CREATE_INDEX_AND_TRANSLOG: + assert startingCommit == null : "CREATE_INDEX_AND_TRANSLOG must not have starting commit; commit [" + startingCommit + "]"; break; case OPEN_INDEX_CREATE_TRANSLOG: - assert commits.isEmpty() == false : "index is opened, but we have no commits"; - // When an engine starts with OPEN_INDEX_CREATE_TRANSLOG, a new fresh index commit will be created immediately. - // We therefore can simply skip processing here as `onCommit` will be called right after with a new commit. - break; case OPEN_INDEX_AND_TRANSLOG: assert commits.isEmpty() == false : "index is opened, but we have no commits"; - onCommit(commits); + assert startingCommit != null && commits.contains(startingCommit) : "Starting commit not in the existing commit list; " + + "startingCommit [" + startingCommit + "], commit list [" + commits + "]"; + keepOnlyStartingCommitOnInit(commits); + // OPEN_INDEX_CREATE_TRANSLOG can open an index commit from other shard with a different translog history, + // We therefore should not use that index commit to update the translog deletion policy. + if (openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) { + updateTranslogDeletionPolicy(); + } break; default: throw new IllegalArgumentException("unknown openMode [" + openMode + "]"); } } + /** + * Keeping existing unsafe commits when opening an engine can be problematic because these commits are not safe + * at the recovering time but they can suddenly become safe in the future. + * The following issues can happen if unsafe commits are kept oninit. + *

+ * 1. Replica can use unsafe commit in peer-recovery. This happens when a replica with a safe commit c1(max_seqno=1) + * and an unsafe commit c2(max_seqno=2) recovers from a primary with c1(max_seqno=1). If a new document(seqno=2) + * is added without flushing, the global checkpoint is advanced to 2; and the replica recovers again, it will use + * the unsafe commit c2(max_seqno=2 at most gcp=2) as the starting commit for sequenced-based recovery even the + * commit c2 contains a stale operation and the document(with seqno=2) will not be replicated to the replica. + *

+ * 2. Min translog gen for recovery can go backwards in peer-recovery. This happens when are replica with a safe commit + * c1(local_checkpoint=1, recovery_translog_gen=1) and an unsafe commit c2(local_checkpoint=2, recovery_translog_gen=2). + * The replica recovers from a primary, and keeps c2 as the last commit, then sets last_translog_gen to 2. Flushing a new + * commit on the replica will cause exception as the new last commit c3 will have recovery_translog_gen=1. The recovery + * translog generation of a commit is calculated based on the current local checkpoint. The local checkpoint of c3 is 1 + * while the local checkpoint of c2 is 2. + *

+ * 3. Commit without translog can be used in recovery. An old index, which was created before multiple-commits is introduced + * (v6.2), may not have a safe commit. If that index has a snapshotted commit without translog and an unsafe commit, + * the policy can consider the snapshotted commit as a safe commit for recovery even the commit does not have translog. + */ + private void keepOnlyStartingCommitOnInit(List commits) { + commits.stream().filter(commit -> startingCommit.equals(commit) == false).forEach(IndexCommit::delete); + assert startingCommit.isDeleted() == false : "Starting commit must not be deleted"; + lastCommit = startingCommit; + safeCommit = startingCommit; + } + @Override public synchronized void onCommit(List commits) throws IOException { final int keptPosition = indexOfKeptCommits(commits, globalCheckpointSupplier.getAsLong()); diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 1b7b891efd6ff..1efbd0706d156 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -185,7 +185,7 @@ public InternalEngine(EngineConfig engineConfig) { "Starting commit should be non-null; mode [" + openMode + "]; startingCommit [" + startingCommit + "]"; this.localCheckpointTracker = createLocalCheckpointTracker(localCheckpointTrackerSupplier, startingCommit); this.combinedDeletionPolicy = new CombinedDeletionPolicy(openMode, translogDeletionPolicy, - translog::getLastSyncedGlobalCheckpoint); + translog::getLastSyncedGlobalCheckpoint, startingCommit); writer = createWriter(openMode == EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG, startingCommit); updateMaxUnsafeAutoIdTimestampFromWriter(writer); assert engineConfig.getForceNewHistoryUUID() == false @@ -411,28 +411,44 @@ public void skipTranslogRecovery() { } private IndexCommit getStartingCommitPoint() throws IOException { - if (openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) { - final long lastSyncedGlobalCheckpoint = translog.getLastSyncedGlobalCheckpoint(); - final long minRetainedTranslogGen = translog.getMinFileGeneration(); - final List existingCommits = DirectoryReader.listCommits(store.directory()); - // We may not have a safe commit if an index was create before v6.2; and if there is a snapshotted commit whose full translog - // files are not retained but max_seqno is at most the global checkpoint, we may mistakenly select it as a starting commit. - // To avoid this issue, we only select index commits whose translog files are fully retained. - if (engineConfig.getIndexSettings().getIndexVersionCreated().before(Version.V_6_2_0)) { - final List recoverableCommits = new ArrayList<>(); - for (IndexCommit commit : existingCommits) { - if (minRetainedTranslogGen <= Long.parseLong(commit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY))) { - recoverableCommits.add(commit); + final IndexCommit startingIndexCommit; + final List existingCommits; + switch (openMode) { + case CREATE_INDEX_AND_TRANSLOG: + startingIndexCommit = null; + break; + case OPEN_INDEX_CREATE_TRANSLOG: + // Use the last commit + existingCommits = DirectoryReader.listCommits(store.directory()); + startingIndexCommit = existingCommits.get(existingCommits.size() - 1); + break; + case OPEN_INDEX_AND_TRANSLOG: + // Use the safe commit + final long lastSyncedGlobalCheckpoint = translog.getLastSyncedGlobalCheckpoint(); + final long minRetainedTranslogGen = translog.getMinFileGeneration(); + existingCommits = DirectoryReader.listCommits(store.directory()); + // We may not have a safe commit if an index was create before v6.2; and if there is a snapshotted commit whose translog + // are not retained but max_seqno is at most the global checkpoint, we may mistakenly select it as a starting commit. + // To avoid this issue, we only select index commits whose translog are fully retained. + if (engineConfig.getIndexSettings().getIndexVersionCreated().before(Version.V_6_2_0)) { + final List recoverableCommits = new ArrayList<>(); + for (IndexCommit commit : existingCommits) { + if (minRetainedTranslogGen <= Long.parseLong(commit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY))) { + recoverableCommits.add(commit); + } } + assert recoverableCommits.isEmpty() == false : "No commit point with translog found; " + + "commits [" + existingCommits + "], minRetainedTranslogGen [" + minRetainedTranslogGen + "]"; + startingIndexCommit = CombinedDeletionPolicy.findSafeCommitPoint(recoverableCommits, lastSyncedGlobalCheckpoint); + } else { + // TODO: Asserts the starting commit is a safe commit once peer-recovery sets global checkpoint. + startingIndexCommit = CombinedDeletionPolicy.findSafeCommitPoint(existingCommits, lastSyncedGlobalCheckpoint); } - assert recoverableCommits.isEmpty() == false : "No commit point with full translog found; " + - "commits [" + existingCommits + "], minRetainedTranslogGen [" + minRetainedTranslogGen + "]"; - return CombinedDeletionPolicy.findSafeCommitPoint(recoverableCommits, lastSyncedGlobalCheckpoint); - } else { - return CombinedDeletionPolicy.findSafeCommitPoint(existingCommits, lastSyncedGlobalCheckpoint); - } + break; + default: + throw new IllegalArgumentException("unknown mode: " + openMode); } - return null; + return startingIndexCommit; } private void recoverFromTranslogInternal() throws IOException { @@ -557,9 +573,7 @@ private ExternalSearcherManager createSearcherManager(SearchFactory externalSear final DirectoryReader directoryReader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(indexWriter), shardId); internalSearcherManager = new SearcherManager(directoryReader, new RamAccountingSearcherFactory(engineConfig.getCircuitBreakerService())); - // The index commit from IndexWriterConfig is null if the engine is open with other modes - // rather than CREATE_INDEX_AND_TRANSLOG. In those cases lastCommittedSegmentInfos will be retrieved from the last commit. - lastCommittedSegmentInfos = store.readCommittedSegmentsInfo(indexWriter.getConfig().getIndexCommit()); + lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo(); ExternalSearcherManager externalSearcherManager = new ExternalSearcherManager(internalSearcherManager, externalSearcherFactory); success = true; diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 3832cd0ae2055..b5d28b3a9ecce 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -23,6 +23,7 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.index.CheckIndex; +import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexOptions; import org.apache.lucene.index.LeafReaderContext; @@ -1290,12 +1291,16 @@ public void createIndexAndTranslog() throws IOException { /** opens the engine on top of the existing lucene engine but creates an empty translog **/ public void openIndexAndCreateTranslog(boolean forceNewHistoryUUID, long globalCheckpoint) throws IOException { - assert recoveryState.getRecoverySource().getType() != RecoverySource.Type.EMPTY_STORE && - recoveryState.getRecoverySource().getType() != RecoverySource.Type.EXISTING_STORE; - SequenceNumbers.CommitInfo commitInfo = store.loadSeqNoInfo(null); - assert commitInfo.localCheckpoint >= globalCheckpoint : - "trying to create a shard whose local checkpoint [" + commitInfo.localCheckpoint + "] is < global checkpoint [" + if (Assertions.ENABLED) { + assert recoveryState.getRecoverySource().getType() != RecoverySource.Type.EMPTY_STORE && + recoveryState.getRecoverySource().getType() != RecoverySource.Type.EXISTING_STORE; + SequenceNumbers.CommitInfo commitInfo = store.loadSeqNoInfo(null); + assert commitInfo.localCheckpoint >= globalCheckpoint : + "trying to create a shard whose local checkpoint [" + commitInfo.localCheckpoint + "] is < global checkpoint [" + globalCheckpoint + "]"; + final List existingCommits = DirectoryReader.listCommits(store.directory()); + assert existingCommits.size() == 1 : "Open index create translog should have one commit, commits[" + existingCommits + "]"; + } globalCheckpointTracker.updateGlobalCheckpointOnReplica(globalCheckpoint, "opening index with a new translog"); innerOpenEngineAndTranslog(EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG, forceNewHistoryUUID); } diff --git a/server/src/main/java/org/elasticsearch/index/store/Store.java b/server/src/main/java/org/elasticsearch/index/store/Store.java index 74be98b813238..7aab2c750d139 100644 --- a/server/src/main/java/org/elasticsearch/index/store/Store.java +++ b/server/src/main/java/org/elasticsearch/index/store/Store.java @@ -182,17 +182,9 @@ public Directory directory() { * @throws IOException if the index is corrupted or the segments file is not present */ public SegmentInfos readLastCommittedSegmentsInfo() throws IOException { - return readCommittedSegmentsInfo(null); - } - - /** - * Returns the committed segments info for the given commit point. - * If the commit point is not provided, this method will return the segments info of the last commit in the store. - */ - public SegmentInfos readCommittedSegmentsInfo(final IndexCommit commit) throws IOException { failIfCorrupted(); try { - return readSegmentsInfo(commit, directory()); + return readSegmentsInfo(null, directory()); } catch (CorruptIndexException ex) { markStoreCorrupted(ex); throw ex; diff --git a/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java b/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java index e74cde52aa418..ca6059dae0067 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java @@ -54,7 +54,8 @@ public class CombinedDeletionPolicyTests extends ESTestCase { public void testKeepCommitsAfterGlobalCheckpoint() throws Exception { final AtomicLong globalCheckpoint = new AtomicLong(); TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy(); - CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(OPEN_INDEX_AND_TRANSLOG, translogPolicy, globalCheckpoint::get); + CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy( + OPEN_INDEX_AND_TRANSLOG, translogPolicy, globalCheckpoint::get, null); final LongArrayList maxSeqNoList = new LongArrayList(); final LongArrayList translogGenList = new LongArrayList(); @@ -93,7 +94,8 @@ public void testAcquireIndexCommit() throws Exception { final AtomicLong globalCheckpoint = new AtomicLong(); final UUID translogUUID = UUID.randomUUID(); TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy(); - CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(OPEN_INDEX_AND_TRANSLOG, translogPolicy, globalCheckpoint::get); + CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy( + OPEN_INDEX_AND_TRANSLOG, translogPolicy, globalCheckpoint::get, null); long lastMaxSeqNo = between(1, 1000); long lastTranslogGen = between(1, 20); int safeIndex = 0; @@ -156,11 +158,12 @@ public void testLegacyIndex() throws Exception { final UUID translogUUID = UUID.randomUUID(); TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy(); - CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(OPEN_INDEX_AND_TRANSLOG, translogPolicy, globalCheckpoint::get); + CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy( + OPEN_INDEX_AND_TRANSLOG, translogPolicy, globalCheckpoint::get, null); long legacyTranslogGen = randomNonNegativeLong(); IndexCommit legacyCommit = mockLegacyIndexCommit(translogUUID, legacyTranslogGen); - indexPolicy.onInit(singletonList(legacyCommit)); + indexPolicy.onCommit(singletonList(legacyCommit)); verify(legacyCommit, never()).delete(); assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), equalTo(legacyTranslogGen)); assertThat(translogPolicy.getTranslogGenerationOfLastCommit(), equalTo(legacyTranslogGen)); @@ -188,7 +191,8 @@ public void testLegacyIndex() throws Exception { public void testDeleteInvalidCommits() throws Exception { final AtomicLong globalCheckpoint = new AtomicLong(randomNonNegativeLong()); TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy(); - CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(OPEN_INDEX_CREATE_TRANSLOG, translogPolicy, globalCheckpoint::get); + CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy( + OPEN_INDEX_CREATE_TRANSLOG, translogPolicy, globalCheckpoint::get, null); final int invalidCommits = between(1, 10); final List commitList = new ArrayList<>(); @@ -211,6 +215,35 @@ public void testDeleteInvalidCommits() throws Exception { } } + /** + * Keeping existing unsafe commits can be problematic because these commits are not safe at the recovering time + * but they can suddenly become safe in the future. See {@link CombinedDeletionPolicy#keepOnlyStartingCommitOnInit(List)} + */ + public void testKeepOnlyStartingCommitOnInit() throws Exception { + final AtomicLong globalCheckpoint = new AtomicLong(randomNonNegativeLong()); + TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy(); + final UUID translogUUID = UUID.randomUUID(); + final List commitList = new ArrayList<>(); + int totalCommits = between(2, 20); + for (int i = 0; i < totalCommits; i++) { + commitList.add(mockIndexCommit(randomNonNegativeLong(), translogUUID, randomNonNegativeLong())); + } + final IndexCommit startingCommit = randomFrom(commitList); + CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy( + OPEN_INDEX_AND_TRANSLOG, translogPolicy, globalCheckpoint::get, startingCommit); + indexPolicy.onInit(commitList); + for (IndexCommit commit : commitList) { + if (commit.equals(startingCommit) == false) { + verify(commit, times(1)).delete(); + } + } + verify(startingCommit, never()).delete(); + assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), + equalTo(Long.parseLong(startingCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY)))); + assertThat(translogPolicy.getTranslogGenerationOfLastCommit(), + equalTo(Long.parseLong(startingCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY)))); + } + IndexCommit mockIndexCommit(long maxSeqNo, UUID translogUUID, long translogGen) throws IOException { final Map userData = new HashMap<>(); userData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(maxSeqNo)); diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 518411e59e8cd..db62db7e01b46 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -163,6 +163,7 @@ import static org.elasticsearch.index.engine.Engine.Operation.Origin.REPLICA; import static org.elasticsearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy; import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.everyItem; import static org.hamcrest.Matchers.greaterThan; @@ -4010,13 +4011,15 @@ public void testFillUpSequenceIdGapsOnRecovery() throws IOException { boolean flushed = false; + AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO); Engine recoveringEngine = null; try { assertEquals(docs - 1, engine.getLocalCheckpointTracker().getMaxSeqNo()); assertEquals(docs - 1, engine.getLocalCheckpointTracker().getCheckpoint()); assertEquals(maxSeqIDOnReplica, replicaEngine.getLocalCheckpointTracker().getMaxSeqNo()); assertEquals(checkpointOnReplica, replicaEngine.getLocalCheckpointTracker().getCheckpoint()); - recoveringEngine = new InternalEngine(copy(replicaEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG)); + recoveringEngine = new InternalEngine(copy( + replicaEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, globalCheckpoint::get)); assertEquals(numDocsOnReplica, recoveringEngine.getTranslog().uncommittedOperations()); recoveringEngine.recoverFromTranslog(); assertEquals(maxSeqIDOnReplica, recoveringEngine.getLocalCheckpointTracker().getMaxSeqNo()); @@ -4038,6 +4041,8 @@ public void testFillUpSequenceIdGapsOnRecovery() throws IOException { assertEquals(maxSeqIDOnReplica, recoveringEngine.getLocalCheckpointTracker().getMaxSeqNo()); assertEquals(maxSeqIDOnReplica, recoveringEngine.getLocalCheckpointTracker().getCheckpoint()); if ((flushed = randomBoolean())) { + globalCheckpoint.set(recoveringEngine.getLocalCheckpointTracker().getMaxSeqNo()); + recoveringEngine.getTranslog().sync(); recoveringEngine.flush(true, true); } } @@ -4047,7 +4052,8 @@ public void testFillUpSequenceIdGapsOnRecovery() throws IOException { // now do it again to make sure we preserve values etc. try { - recoveringEngine = new InternalEngine(copy(replicaEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG)); + recoveringEngine = new InternalEngine( + copy(replicaEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, globalCheckpoint::get)); if (flushed) { assertEquals(0, recoveringEngine.getTranslog().uncommittedOperations()); } @@ -4355,4 +4361,57 @@ public void testAcquireIndexCommit() throws Exception { assertThat(DirectoryReader.listCommits(engine.store.directory()), hasSize(1)); } } + + public void testOpenIndexAndTranslogKeepOnlySafeCommit() throws Exception { + IOUtils.close(engine); + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO); + final EngineConfig config = copy(engine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, globalCheckpoint::get); + final IndexCommit safeCommit; + try (InternalEngine engine = new InternalEngine(copy(config, EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG))) { + final int numDocs = between(5, 50); + for (int i = 0; i < numDocs; i++) { + index(engine, i); + if (randomBoolean()) { + engine.flush(); + } + } + // Selects a starting commit and advances and persists the global checkpoint to that commit. + final List commits = DirectoryReader.listCommits(engine.store.directory()); + safeCommit = randomFrom(commits); + globalCheckpoint.set(Long.parseLong(safeCommit.getUserData().get(SequenceNumbers.MAX_SEQ_NO))); + engine.getTranslog().sync(); + } + try (InternalEngine engine = new InternalEngine(copy(config, EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG))) { + final List existingCommits = DirectoryReader.listCommits(engine.store.directory()); + assertThat("OPEN_INDEX_AND_TRANSLOG should keep only safe commit", existingCommits, contains(safeCommit)); + } + } + + public void testOpenIndexCreateTranslogKeepOnlyLastCommit() throws Exception { + IOUtils.close(engine); + final EngineConfig config = copy(engine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG); + final Map lastCommit; + try (InternalEngine engine = new InternalEngine(copy(config, EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG))) { + engine.skipTranslogRecovery(); + final int numDocs = between(5, 50); + for (int i = 0; i < numDocs; i++) { + index(engine, i); + if (randomBoolean()) { + engine.flush(); + } + } + final List commits = DirectoryReader.listCommits(engine.store.directory()); + lastCommit = commits.get(commits.size() - 1).getUserData(); + } + try (InternalEngine engine = new InternalEngine(copy(config, EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG))) { + final List existingCommits = DirectoryReader.listCommits(engine.store.directory()); + assertThat("OPEN_INDEX_CREATE_TRANSLOG should keep only last commit", existingCommits, hasSize(1)); + final Map userData = existingCommits.get(0).getUserData(); + assertThat(userData.get(SequenceNumbers.MAX_SEQ_NO), equalTo(lastCommit.get(SequenceNumbers.MAX_SEQ_NO))); + assertThat(userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY), equalTo(lastCommit.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY))); + // Translog tags should be fresh. + assertThat(userData.get(Translog.TRANSLOG_UUID_KEY), not(equalTo(lastCommit.get(Translog.TRANSLOG_UUID_KEY)))); + assertThat(userData.get(Translog.TRANSLOG_GENERATION_KEY), equalTo("1")); + } + } } diff --git a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java index aa97c2049915f..cd948ed9f9036 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java @@ -304,8 +304,52 @@ public void testRecoveryAfterPrimaryPromotion() throws Exception { replica.store().close(); newReplica = shards.addReplicaWithExistingPath(replica.shardPath(), replica.routingEntry().currentNodeId()); shards.recoverReplica(newReplica); - shards.assertAllEqual(totalDocs); + // Make sure that flushing on a recovering shard is ok. + shards.flush(); + shards.assertAllEqual(totalDocs); + } + } + + public void testReplicaRollbackStaleDocumentsInPeerRecovery() throws Exception { + try (ReplicationGroup shards = createGroup(2)) { + shards.startAll(); + IndexShard oldPrimary = shards.getPrimary(); + IndexShard newPrimary = shards.getReplicas().get(0); + IndexShard replica = shards.getReplicas().get(1); + int goodDocs = shards.indexDocs(scaledRandomIntBetween(1, 20)); + shards.flush(); + // simulate docs that were inflight when primary failed, these will be rolled back + int staleDocs = scaledRandomIntBetween(1, 10); + logger.info("--> indexing {} stale docs", staleDocs); + for (int i = 0; i < staleDocs; i++) { + final IndexRequest indexRequest = new IndexRequest(index.getName(), "type", "stale_" + i) + .source("{}", XContentType.JSON); + final BulkShardRequest bulkShardRequest = indexOnPrimary(indexRequest, oldPrimary); + indexOnReplica(bulkShardRequest, replica); + } + shards.flush(); + shards.promoteReplicaToPrimary(newPrimary).get(); + // Recover a replica should rollback the stale documents + shards.removeReplica(replica); + replica.close("recover replica - first time", false); + replica.store().close(); + replica = shards.addReplicaWithExistingPath(replica.shardPath(), replica.routingEntry().currentNodeId()); + shards.recoverReplica(replica); + shards.assertAllEqual(goodDocs); + // Index more docs - move the global checkpoint >= seqno of the stale operations. + goodDocs += shards.indexDocs(scaledRandomIntBetween(staleDocs, staleDocs * 5)); + shards.syncGlobalCheckpoint(); + assertThat(replica.getTranslog().getLastSyncedGlobalCheckpoint(), equalTo(replica.seqNoStats().getMaxSeqNo())); + // Recover a replica again should also rollback the stale documents. + shards.removeReplica(replica); + replica.close("recover replica - second time", false); + replica.store().close(); + IndexShard anotherReplica = shards.addReplicaWithExistingPath(replica.shardPath(), replica.routingEntry().currentNodeId()); + shards.recoverReplica(anotherReplica); + shards.assertAllEqual(goodDocs); + shards.flush(); + shards.assertAllEqual(goodDocs); } } From 4f5be7db3ce9f1ea7f864cc1fd38ee09363aa64d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20B=C3=BCscher?= Date: Tue, 16 Jan 2018 15:19:47 +0100 Subject: [PATCH 8/8] [Docs] Fix base directory to include for put_mapping.asciidoc --- docs/java-api/admin/indices/put-mapping.asciidoc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/java-api/admin/indices/put-mapping.asciidoc b/docs/java-api/admin/indices/put-mapping.asciidoc index 97cfcf589b9d8..887f6cb76e7c6 100644 --- a/docs/java-api/admin/indices/put-mapping.asciidoc +++ b/docs/java-api/admin/indices/put-mapping.asciidoc @@ -1,5 +1,5 @@ [[java-admin-indices-put-mapping]] -:base-dir: {docdir}/../../core/src/test/java/org/elasticsearch/action/admin/indices/create +:base-dir: {docdir}/../../server/src/test/java/org/elasticsearch/action/admin/indices/create ==== Put Mapping @@ -34,4 +34,4 @@ include-tagged::{base-dir}/CreateIndexIT.java[putMapping-request-source-append] <2> Updates the `user` mapping type. <3> This `user` has now a new field `user_name` -:base-dir!: \ No newline at end of file +:base-dir!: