From 8270021ea52e1fcd72542971447f1a922d6264fa Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Tue, 29 Jan 2019 10:23:05 -0500 Subject: [PATCH] Move update and delete by query to use seq# for optimistic concurrency control (#37857) The delete and update by query APIs both offer protection against overriding concurrent user changes to the documents they touch. They currently are using internal versioning. This PR changes that to rely on sequences numbers and primary terms. Relates #37639 Relates #36148 Relates #10708 --- docs/reference/docs/index_.asciidoc | 8 --- .../AbstractAsyncBulkByScrollAction.java | 26 ++++---- .../reindex/AsyncDeleteByQueryAction.java | 26 +++++--- .../index/reindex/TransportReindexAction.java | 25 ++++---- .../reindex/TransportUpdateByQueryAction.java | 30 +++++---- .../reindex/remote/RemoteResponseParsers.java | 10 +-- .../reindex/AsyncBulkByScrollActionTests.java | 9 +-- .../reindex/UpdateByQueryMetadataTests.java | 12 ++-- .../reindex/UpdateByQueryWithScriptTests.java | 3 +- .../test/delete_by_query/10_basic.yml | 64 ++++++++++++++++++- .../test/delete_by_query/40_versioning.yml | 4 ++ .../test/update_by_query/10_basic.yml | 49 +++++++++++++- .../test/update_by_query/40_versioning.yml | 3 + .../reindex/ClientScrollableHitSource.java | 10 +++ .../index/reindex/ScrollableHitSource.java | 33 +++++++++- 15 files changed, 232 insertions(+), 80 deletions(-) diff --git a/docs/reference/docs/index_.asciidoc b/docs/reference/docs/index_.asciidoc index 8e586bcff402b..27776aef95c3e 100644 --- a/docs/reference/docs/index_.asciidoc +++ b/docs/reference/docs/index_.asciidoc @@ -372,14 +372,6 @@ the current document version of 1. If the document was already updated and its version was set to 2 or higher, the indexing command will fail and result in a conflict (409 http status code). -WARNING: External versioning supports the value 0 as a valid version number. -This allows the version to be in sync with an external versioning system -where version numbers start from zero instead of one. It has the side effect -that documents with version number equal to zero can neither be updated -using the <> nor be deleted -using the <> as long as their -version number is equal to zero. - A nice side effect is that there is no need to maintain strict ordering of async indexing operations executed as a result of changes to a source database, as long as version numbers from the source database are used. diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollAction.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollAction.java index ad833bd310632..aa0b331a084fc 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollAction.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollAction.java @@ -35,7 +35,6 @@ import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.ParentTaskAssigningClient; -import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; @@ -51,6 +50,7 @@ import org.elasticsearch.script.Script; import org.elasticsearch.script.ScriptService; import org.elasticsearch.script.UpdateScript; +import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.sort.SortBuilder; import org.elasticsearch.threadpool.ThreadPool; @@ -89,7 +89,6 @@ public abstract class AbstractAsyncBulkByScrollActionrequest variables all representing child @@ -112,9 +111,10 @@ public abstract class AbstractAsyncBulkByScrollAction, ScrollableHitSource.Hit, RequestWrapper> scriptApplier; - public AbstractAsyncBulkByScrollAction(BulkByScrollTask task, Logger logger, ParentTaskAssigningClient client, - ThreadPool threadPool, Request mainRequest, ScriptService scriptService, ClusterState clusterState, - ActionListener listener) { + public AbstractAsyncBulkByScrollAction(BulkByScrollTask task, boolean needsSourceDocumentVersions, + boolean needsSourceDocumentSeqNoAndPrimaryTerm, Logger logger, ParentTaskAssigningClient client, + ThreadPool threadPool, Request mainRequest, ScriptService scriptService, + ActionListener listener) { this.task = task; if (!task.isWorker()) { @@ -126,7 +126,6 @@ public AbstractAsyncBulkByScrollAction(BulkByScrollTask task, Logger logger, Par this.client = client; this.threadPool = threadPool; this.scriptService = scriptService; - this.clusterState = clusterState; this.mainRequest = mainRequest; this.listener = listener; BackoffPolicy backoffPolicy = buildBackoffPolicy(); @@ -138,11 +137,13 @@ public AbstractAsyncBulkByScrollAction(BulkByScrollTask task, Logger logger, Par * them and if we add _doc as the first sort by default then sorts will never work.... So we add it here, only if there isn't * another sort. */ - List> sorts = mainRequest.getSearchRequest().source().sorts(); + final SearchSourceBuilder sourceBuilder = mainRequest.getSearchRequest().source(); + List> sorts = sourceBuilder.sorts(); if (sorts == null || sorts.isEmpty()) { - mainRequest.getSearchRequest().source().sort(fieldSort("_doc")); + sourceBuilder.sort(fieldSort("_doc")); } - mainRequest.getSearchRequest().source().version(needsSourceDocumentVersions()); + sourceBuilder.version(needsSourceDocumentVersions); + sourceBuilder.seqNoAndPrimaryTerm(needsSourceDocumentSeqNoAndPrimaryTerm); } /** @@ -154,12 +155,7 @@ public BiFunction, ScrollableHitSource.Hit, RequestWrapper> // The default script applier executes a no-op return (request, searchHit) -> request; } - - /** - * Does this operation need the versions of the source documents? - */ - protected abstract boolean needsSourceDocumentVersions(); - + /** * Build the {@link RequestWrapper} for a single search hit. This shouldn't handle * metadata or scripting. That will be handled by copyMetadata and diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AsyncDeleteByQueryAction.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AsyncDeleteByQueryAction.java index 8dd30a9fa9d65..91141d04f0019 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AsyncDeleteByQueryAction.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AsyncDeleteByQueryAction.java @@ -20,6 +20,7 @@ package org.elasticsearch.index.reindex; import org.apache.logging.log4j.Logger; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.client.ParentTaskAssigningClient; @@ -31,20 +32,20 @@ * Implementation of delete-by-query using scrolling and bulk. */ public class AsyncDeleteByQueryAction extends AbstractAsyncBulkByScrollAction { + private final boolean useSeqNoForCAS; + public AsyncDeleteByQueryAction(BulkByScrollTask task, Logger logger, ParentTaskAssigningClient client, ThreadPool threadPool, DeleteByQueryRequest request, ScriptService scriptService, ClusterState clusterState, ActionListener listener) { - super(task, logger, client, threadPool, request, scriptService, clusterState, listener); + super(task, + // not all nodes support sequence number powered optimistic concurrency control, we fall back to version + clusterState.nodes().getMinNodeVersion().onOrAfter(Version.V_6_7_0) == false, + // all nodes support sequence number powered optimistic concurrency control and we can use it + clusterState.nodes().getMinNodeVersion().onOrAfter(Version.V_6_7_0), + logger, client, threadPool, request, scriptService, listener); + useSeqNoForCAS = clusterState.nodes().getMinNodeVersion().onOrAfter(Version.V_6_7_0); } - @Override - protected boolean needsSourceDocumentVersions() { - /* - * We always need the version of the source document so we can report a version conflict if we try to delete it and it has been - * changed. - */ - return true; - } @Override protected boolean accept(ScrollableHitSource.Hit doc) { @@ -59,7 +60,12 @@ protected RequestWrapper buildRequest(ScrollableHitSource.Hit doc delete.index(doc.getIndex()); delete.type(doc.getType()); delete.id(doc.getId()); - delete.version(doc.getVersion()); + if (useSeqNoForCAS) { + delete.setIfSeqNo(doc.getSeqNo()); + delete.setIfPrimaryTerm(doc.getPrimaryTerm()); + } else { + delete.version(doc.getVersion()); + } return wrap(delete); } diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportReindexAction.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportReindexAction.java index 8ddca8241c97e..e84685d1f7ca0 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportReindexAction.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportReindexAction.java @@ -37,10 +37,6 @@ import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.bulk.BackoffPolicy; import org.elasticsearch.action.bulk.BulkItemResponse.Failure; -import org.elasticsearch.client.RestClientBuilder; -import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.xcontent.DeprecationHandler; -import org.elasticsearch.index.reindex.ScrollableHitSource.SearchFailure; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.support.ActionFilters; @@ -49,22 +45,26 @@ import org.elasticsearch.client.Client; import org.elasticsearch.client.ParentTaskAssigningClient; import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestClientBuilder; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.DeprecationHandler; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.mapper.VersionFieldMapper; +import org.elasticsearch.index.reindex.ScrollableHitSource.SearchFailure; import org.elasticsearch.index.reindex.remote.RemoteScrollableHitSource; import org.elasticsearch.script.Script; import org.elasticsearch.script.ScriptService; @@ -259,16 +259,13 @@ static class AsyncIndexBySearchAction extends AbstractAsyncBulkByScrollAction listener) { - super(task, logger, client, threadPool, request, scriptService, clusterState, listener); - } - - @Override - protected boolean needsSourceDocumentVersions() { - /* - * We only need the source version if we're going to use it when write and we only do that when the destination request uses - * external versioning. - */ - return mainRequest.getDestination().versionType() != VersionType.INTERNAL; + super(task, + /* + * We only need the source version if we're going to use it when write and we only do that when the destination request uses + * external versioning. + */ + request.getDestination().versionType() != VersionType.INTERNAL, + false, logger, client, threadPool, request, scriptService, listener); } @Override diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportUpdateByQueryAction.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportUpdateByQueryAction.java index 5ec480c1bad49..1ad7f6dc9d73d 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportUpdateByQueryAction.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportUpdateByQueryAction.java @@ -20,6 +20,7 @@ package org.elasticsearch.index.reindex; import org.apache.logging.log4j.Logger; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.ActionFilters; @@ -86,19 +87,19 @@ protected void doExecute(UpdateByQueryRequest request, ActionListener { + + private final boolean useSeqNoForCAS; + AsyncIndexBySearchAction(BulkByScrollTask task, Logger logger, ParentTaskAssigningClient client, ThreadPool threadPool, UpdateByQueryRequest request, ScriptService scriptService, ClusterState clusterState, ActionListener listener) { - super(task, logger, client, threadPool, request, scriptService, clusterState, listener); - } - - @Override - protected boolean needsSourceDocumentVersions() { - /* - * We always need the version of the source document so we can report a version conflict if we try to delete it and it has - * been changed. - */ - return true; + super(task, + // not all nodes support sequence number powered optimistic concurrency control, we fall back to version + clusterState.nodes().getMinNodeVersion().onOrAfter(Version.V_6_7_0) == false, + // all nodes support sequence number powered optimistic concurrency control and we can use it + clusterState.nodes().getMinNodeVersion().onOrAfter(Version.V_6_7_0), + logger, client, threadPool, request, scriptService, listener); + useSeqNoForCAS = clusterState.nodes().getMinNodeVersion().onOrAfter(Version.V_6_7_0); } @Override @@ -117,8 +118,13 @@ protected RequestWrapper buildRequest(ScrollableHitSource.Hit doc) index.type(doc.getType()); index.id(doc.getId()); index.source(doc.getSource(), doc.getXContentType()); - index.versionType(VersionType.INTERNAL); - index.version(doc.getVersion()); + if (useSeqNoForCAS) { + index.setIfSeqNo(doc.getSeqNo()); + index.setIfPrimaryTerm(doc.getPrimaryTerm()); + } else { + index.versionType(VersionType.INTERNAL); + index.version(doc.getVersion()); + } index.setPipeline(mainRequest.getPipeline()); return wrap(index); } diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/remote/RemoteResponseParsers.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/remote/RemoteResponseParsers.java index d18e9c85bcdab..6412f64967d99 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/remote/RemoteResponseParsers.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/remote/RemoteResponseParsers.java @@ -20,13 +20,9 @@ package org.elasticsearch.index.reindex.remote; import org.elasticsearch.Version; -import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.index.reindex.ScrollableHitSource.BasicHit; -import org.elasticsearch.index.reindex.ScrollableHitSource.Hit; -import org.elasticsearch.index.reindex.ScrollableHitSource.Response; -import org.elasticsearch.index.reindex.ScrollableHitSource.SearchFailure; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.ParsingException; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.xcontent.ConstructingObjectParser; @@ -36,6 +32,10 @@ import org.elasticsearch.common.xcontent.XContentLocation; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.reindex.ScrollableHitSource.BasicHit; +import org.elasticsearch.index.reindex.ScrollableHitSource.Hit; +import org.elasticsearch.index.reindex.ScrollableHitSource.Response; +import org.elasticsearch.index.reindex.ScrollableHitSource.SearchFailure; import java.io.IOException; import java.util.List; diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java index 3d2199e516773..e886a3d614124 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java @@ -677,13 +677,8 @@ private void simulateScrollResponse(DummyAsyncBulkByScrollAction action, TimeVal private class DummyAsyncBulkByScrollAction extends AbstractAsyncBulkByScrollAction { DummyAsyncBulkByScrollAction() { - super(testTask, AsyncBulkByScrollActionTests.this.logger, new ParentTaskAssigningClient(client, localNode, testTask), - client.threadPool(), testRequest, null, null, listener); - } - - @Override - protected boolean needsSourceDocumentVersions() { - return randomBoolean(); + super(testTask, randomBoolean(), randomBoolean(), AsyncBulkByScrollActionTests.this.logger, + new ParentTaskAssigningClient(client, localNode, testTask), client.threadPool(), testRequest, null, listener); } @Override diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/UpdateByQueryMetadataTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/UpdateByQueryMetadataTests.java index 3ce8884ff92fb..95ee787f13f63 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/UpdateByQueryMetadataTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/UpdateByQueryMetadataTests.java @@ -19,12 +19,14 @@ package org.elasticsearch.index.reindex; -import org.elasticsearch.index.reindex.ScrollableHitSource.Hit; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.index.reindex.ScrollableHitSource.Hit; public class UpdateByQueryMetadataTests - extends AbstractAsyncBulkByScrollActionMetadataTestCase { - public void testRoutingIsCopied() throws Exception { + extends AbstractAsyncBulkByScrollActionMetadataTestCase { + + public void testRoutingIsCopied() { IndexRequest index = new IndexRequest(); action().copyMetadata(AbstractAsyncBulkByScrollAction.wrap(index), doc().setRouting("foo")); assertEquals("foo", index.routing()); @@ -43,12 +45,12 @@ protected UpdateByQueryRequest request() { private class TestAction extends TransportUpdateByQueryAction.AsyncIndexBySearchAction { TestAction() { super(UpdateByQueryMetadataTests.this.task, UpdateByQueryMetadataTests.this.logger, null, - UpdateByQueryMetadataTests.this.threadPool, request(), null, null, listener()); + UpdateByQueryMetadataTests.this.threadPool, request(), null, ClusterState.EMPTY_STATE, listener()); } @Override public AbstractAsyncBulkByScrollAction.RequestWrapper copyMetadata(AbstractAsyncBulkByScrollAction.RequestWrapper request, - Hit doc) { + Hit doc) { return super.copyMetadata(request, doc); } } diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/UpdateByQueryWithScriptTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/UpdateByQueryWithScriptTests.java index 214acebe218b7..7a038129d81e5 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/UpdateByQueryWithScriptTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/UpdateByQueryWithScriptTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.index.reindex; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.script.ScriptService; import java.util.Date; @@ -54,6 +55,6 @@ protected UpdateByQueryRequest request() { @Override protected TransportUpdateByQueryAction.AsyncIndexBySearchAction action(ScriptService scriptService, UpdateByQueryRequest request) { return new TransportUpdateByQueryAction.AsyncIndexBySearchAction(task, logger, null, threadPool, request, scriptService, - null, listener()); + ClusterState.EMPTY_STATE, listener()); } } diff --git a/modules/reindex/src/test/resources/rest-api-spec/test/delete_by_query/10_basic.yml b/modules/reindex/src/test/resources/rest-api-spec/test/delete_by_query/10_basic.yml index e5bf6368eaba8..5eae09ff77be0 100644 --- a/modules/reindex/src/test/resources/rest-api-spec/test/delete_by_query/10_basic.yml +++ b/modules/reindex/src/test/resources/rest-api-spec/test/delete_by_query/10_basic.yml @@ -89,7 +89,11 @@ - is_false: response.task --- -"Response for version conflict": +"Response for version conflict (version powered)": + - skip: + version: "6.7.0 - " + reason: reindex moved to rely on sequence numbers for concurrency control + - do: indices.create: index: test @@ -143,6 +147,64 @@ - match: {count: 1} +--- +"Response for version conflict (seq no powered)": + - skip: + version: " - 6.6.99" + reason: reindex moved to rely on sequence numbers for concurrency control + + - do: + indices.create: + index: test + body: + settings: + index.refresh_interval: -1 + - do: + index: + index: test + type: _doc + id: 1 + body: { "text": "test" } + - do: + indices.refresh: {} + # Creates a new version for reindex to miss on scan. + - do: + index: + index: test + type: _doc + id: 1 + body: { "text": "test2" } + + - do: + catch: conflict + delete_by_query: + index: test + body: + query: + match_all: {} + + - match: {deleted: 0} + - match: {version_conflicts: 1} + - match: {batches: 1} + - match: {failures.0.index: test} + - match: {failures.0.type: _doc} + - match: {failures.0.id: "1"} + - match: {failures.0.status: 409} + - match: {failures.0.cause.type: version_conflict_engine_exception} + - match: {failures.0.cause.reason: "/\\[_doc\\]\\[1\\]:.version.conflict,.required.seqNo.\\[\\d+\\]/"} + - match: {failures.0.cause.shard: /\d+/} + - match: {failures.0.cause.index: test} + - gte: { took: 0 } + + - do: + indices.refresh: {} + + - do: + count: + index: test + + - match: {count: 1} + --- "Response for version conflict with conflicts=proceed": - do: diff --git a/modules/reindex/src/test/resources/rest-api-spec/test/delete_by_query/40_versioning.yml b/modules/reindex/src/test/resources/rest-api-spec/test/delete_by_query/40_versioning.yml index c81305e282431..e6638e5069928 100644 --- a/modules/reindex/src/test/resources/rest-api-spec/test/delete_by_query/40_versioning.yml +++ b/modules/reindex/src/test/resources/rest-api-spec/test/delete_by_query/40_versioning.yml @@ -1,5 +1,9 @@ --- "delete_by_query fails to delete documents with version number equal to zero": + - skip: + version: "6.7.0 - " + reason: reindex moved to rely on sequence numbers for concurrency control + - do: index: index: index1 diff --git a/modules/reindex/src/test/resources/rest-api-spec/test/update_by_query/10_basic.yml b/modules/reindex/src/test/resources/rest-api-spec/test/update_by_query/10_basic.yml index 784623f714ca6..7b38f8636a757 100644 --- a/modules/reindex/src/test/resources/rest-api-spec/test/update_by_query/10_basic.yml +++ b/modules/reindex/src/test/resources/rest-api-spec/test/update_by_query/10_basic.yml @@ -74,7 +74,10 @@ - is_false: response.deleted --- -"Response for version conflict": +"Response for version conflict (version powered)": + - skip: + version: "6.7.0 - " + reason: reindex moved to rely on sequence numbers for concurrency control - do: indices.create: index: test @@ -115,6 +118,50 @@ - match: {failures.0.cause.index: test} - gte: { took: 0 } +--- +"Response for version conflict (seq no powered)": + - skip: + version: " - 6.6.99" + reason: reindex moved to rely on sequence numbers for concurrency control + - do: + indices.create: + index: test + body: + settings: + index.refresh_interval: -1 + - do: + index: + index: test + type: _doc + id: 1 + body: { "text": "test" } + - do: + indices.refresh: {} + # Creates a new version for reindex to miss on scan. + - do: + index: + index: test + type: _doc + id: 1 + body: { "text": "test2" } + + - do: + catch: conflict + update_by_query: + index: test + - match: {updated: 0} + - match: {version_conflicts: 1} + - match: {batches: 1} + - match: {failures.0.index: test} + - match: {failures.0.type: _doc} + - match: {failures.0.id: "1"} + - match: {failures.0.status: 409} + - match: {failures.0.cause.type: version_conflict_engine_exception} + - match: {failures.0.cause.reason: "/\\[_doc\\]\\[1\\]:.version.conflict,.required.seqNo.\\[\\d+\\]/"} + - match: {failures.0.cause.shard: /\d+/} + - match: {failures.0.cause.index: test} + - gte: { took: 0 } + --- "Response for version conflict with conflicts=proceed": - do: diff --git a/modules/reindex/src/test/resources/rest-api-spec/test/update_by_query/40_versioning.yml b/modules/reindex/src/test/resources/rest-api-spec/test/update_by_query/40_versioning.yml index 1718714defd4e..b7eaffc019fd9 100644 --- a/modules/reindex/src/test/resources/rest-api-spec/test/update_by_query/40_versioning.yml +++ b/modules/reindex/src/test/resources/rest-api-spec/test/update_by_query/40_versioning.yml @@ -24,6 +24,9 @@ --- "update_by_query fails to update documents with version number equal to zero": + - skip: + version: "6.7.0 - " + reason: reindex moved to rely on sequence numbers for concurrency control - do: index: index: index1 diff --git a/server/src/main/java/org/elasticsearch/index/reindex/ClientScrollableHitSource.java b/server/src/main/java/org/elasticsearch/index/reindex/ClientScrollableHitSource.java index 67e0f5400b389..58d4bb73e4572 100644 --- a/server/src/main/java/org/elasticsearch/index/reindex/ClientScrollableHitSource.java +++ b/server/src/main/java/org/elasticsearch/index/reindex/ClientScrollableHitSource.java @@ -245,6 +245,16 @@ public long getVersion() { public String getParent() { return fieldValue(ParentFieldMapper.NAME); } + + @Override + public long getSeqNo() { + return delegate.getSeqNo(); + } + + @Override + public long getPrimaryTerm() { + return delegate.getPrimaryTerm(); + } @Override public String getRouting() { diff --git a/server/src/main/java/org/elasticsearch/index/reindex/ScrollableHitSource.java b/server/src/main/java/org/elasticsearch/index/reindex/ScrollableHitSource.java index f0bfa9c80e746..07311112870fc 100644 --- a/server/src/main/java/org/elasticsearch/index/reindex/ScrollableHitSource.java +++ b/server/src/main/java/org/elasticsearch/index/reindex/ScrollableHitSource.java @@ -30,10 +30,10 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.xcontent.ToXContent.Params; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.search.SearchHit; import org.elasticsearch.threadpool.ThreadPool; @@ -191,6 +191,17 @@ public interface Hit { * internal APIs. */ long getVersion(); + + /** + * The sequence number of the match or {@link SequenceNumbers#UNASSIGNED_SEQ_NO} if sequence numbers weren't requested. + */ + long getSeqNo(); + + /** + * The primary term of the match or {@link SequenceNumbers#UNASSIGNED_PRIMARY_TERM} if sequence numbers weren't requested. + */ + long getPrimaryTerm(); + /** * The source of the hit. Returns null if the source didn't come back from the search, usually because it source wasn't stored at * all. @@ -223,6 +234,8 @@ public static class BasicHit implements Hit { private XContentType xContentType; private String parent; private String routing; + private long seqNo; + private long primaryTerm; public BasicHit(String index, String type, String id, long version) { this.index = index; @@ -251,6 +264,16 @@ public long getVersion() { return version; } + @Override + public long getSeqNo() { + return seqNo; + } + + @Override + public long getPrimaryTerm() { + return primaryTerm; + } + @Override public BytesReference getSource() { return source; @@ -286,6 +309,14 @@ public BasicHit setRouting(String routing) { this.routing = routing; return this; } + + public void setSeqNo(long seqNo) { + this.seqNo = seqNo; + } + + public void setPrimaryTerm(long primaryTerm) { + this.primaryTerm = primaryTerm; + } } /**