Skip to content

Commit

Permalink
Move update and delete by query to use seq# for optimistic concurrenc…
Browse files Browse the repository at this point in the history
…y control (elastic#37857)

The delete and update by query APIs both offer protection against overriding concurrent user changes to the documents they touch. They currently are using internal versioning. This PR changes that to rely on sequences numbers and primary terms.

Relates elastic#37639
Relates elastic#36148
Relates elastic#10708
  • Loading branch information
bleskes committed Feb 1, 2019
1 parent 9c284e6 commit 8270021
Show file tree
Hide file tree
Showing 15 changed files with 232 additions and 80 deletions.
8 changes: 0 additions & 8 deletions docs/reference/docs/index_.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -372,14 +372,6 @@ the current document version of 1. If the document was already updated
and its version was set to 2 or higher, the indexing command will fail
and result in a conflict (409 http status code).

WARNING: External versioning supports the value 0 as a valid version number.
This allows the version to be in sync with an external versioning system
where version numbers start from zero instead of one. It has the side effect
that documents with version number equal to zero can neither be updated
using the <<docs-update-by-query,Update-By-Query API>> nor be deleted
using the <<docs-delete-by-query,Delete By Query API>> as long as their
version number is equal to zero.

A nice side effect is that there is no need to maintain strict ordering
of async indexing operations executed as a result of changes to a source
database, as long as version numbers from the source database are used.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.ParentTaskAssigningClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
Expand All @@ -51,6 +50,7 @@
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.script.UpdateScript;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortBuilder;
import org.elasticsearch.threadpool.ThreadPool;

Expand Down Expand Up @@ -89,7 +89,6 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
protected final WorkerBulkByScrollTaskState worker;
protected final ThreadPool threadPool;
protected final ScriptService scriptService;
protected final ClusterState clusterState;

/**
* The request for this action. Named mainRequest because we create lots of <code>request</code> variables all representing child
Expand All @@ -112,9 +111,10 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
*/
private final BiFunction<RequestWrapper<?>, ScrollableHitSource.Hit, RequestWrapper<?>> scriptApplier;

public AbstractAsyncBulkByScrollAction(BulkByScrollTask task, Logger logger, ParentTaskAssigningClient client,
ThreadPool threadPool, Request mainRequest, ScriptService scriptService, ClusterState clusterState,
ActionListener<BulkByScrollResponse> listener) {
public AbstractAsyncBulkByScrollAction(BulkByScrollTask task, boolean needsSourceDocumentVersions,
boolean needsSourceDocumentSeqNoAndPrimaryTerm, Logger logger, ParentTaskAssigningClient client,
ThreadPool threadPool, Request mainRequest, ScriptService scriptService,
ActionListener<BulkByScrollResponse> listener) {

this.task = task;
if (!task.isWorker()) {
Expand All @@ -126,7 +126,6 @@ public AbstractAsyncBulkByScrollAction(BulkByScrollTask task, Logger logger, Par
this.client = client;
this.threadPool = threadPool;
this.scriptService = scriptService;
this.clusterState = clusterState;
this.mainRequest = mainRequest;
this.listener = listener;
BackoffPolicy backoffPolicy = buildBackoffPolicy();
Expand All @@ -138,11 +137,13 @@ public AbstractAsyncBulkByScrollAction(BulkByScrollTask task, Logger logger, Par
* them and if we add _doc as the first sort by default then sorts will never work.... So we add it here, only if there isn't
* another sort.
*/
List<SortBuilder<?>> sorts = mainRequest.getSearchRequest().source().sorts();
final SearchSourceBuilder sourceBuilder = mainRequest.getSearchRequest().source();
List<SortBuilder<?>> sorts = sourceBuilder.sorts();
if (sorts == null || sorts.isEmpty()) {
mainRequest.getSearchRequest().source().sort(fieldSort("_doc"));
sourceBuilder.sort(fieldSort("_doc"));
}
mainRequest.getSearchRequest().source().version(needsSourceDocumentVersions());
sourceBuilder.version(needsSourceDocumentVersions);
sourceBuilder.seqNoAndPrimaryTerm(needsSourceDocumentSeqNoAndPrimaryTerm);
}

/**
Expand All @@ -154,12 +155,7 @@ public BiFunction<RequestWrapper<?>, ScrollableHitSource.Hit, RequestWrapper<?>>
// The default script applier executes a no-op
return (request, searchHit) -> request;
}

/**
* Does this operation need the versions of the source documents?
*/
protected abstract boolean needsSourceDocumentVersions();


/**
* Build the {@link RequestWrapper} for a single search hit. This shouldn't handle
* metadata or scripting. That will be handled by copyMetadata and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.index.reindex;

import org.apache.logging.log4j.Logger;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.client.ParentTaskAssigningClient;
Expand All @@ -31,20 +32,20 @@
* Implementation of delete-by-query using scrolling and bulk.
*/
public class AsyncDeleteByQueryAction extends AbstractAsyncBulkByScrollAction<DeleteByQueryRequest> {
private final boolean useSeqNoForCAS;

public AsyncDeleteByQueryAction(BulkByScrollTask task, Logger logger, ParentTaskAssigningClient client,
ThreadPool threadPool, DeleteByQueryRequest request, ScriptService scriptService,
ClusterState clusterState, ActionListener<BulkByScrollResponse> listener) {
super(task, logger, client, threadPool, request, scriptService, clusterState, listener);
super(task,
// not all nodes support sequence number powered optimistic concurrency control, we fall back to version
clusterState.nodes().getMinNodeVersion().onOrAfter(Version.V_6_7_0) == false,
// all nodes support sequence number powered optimistic concurrency control and we can use it
clusterState.nodes().getMinNodeVersion().onOrAfter(Version.V_6_7_0),
logger, client, threadPool, request, scriptService, listener);
useSeqNoForCAS = clusterState.nodes().getMinNodeVersion().onOrAfter(Version.V_6_7_0);
}

@Override
protected boolean needsSourceDocumentVersions() {
/*
* We always need the version of the source document so we can report a version conflict if we try to delete it and it has been
* changed.
*/
return true;
}

@Override
protected boolean accept(ScrollableHitSource.Hit doc) {
Expand All @@ -59,7 +60,12 @@ protected RequestWrapper<DeleteRequest> buildRequest(ScrollableHitSource.Hit doc
delete.index(doc.getIndex());
delete.type(doc.getType());
delete.id(doc.getId());
delete.version(doc.getVersion());
if (useSeqNoForCAS) {
delete.setIfSeqNo(doc.getSeqNo());
delete.setIfPrimaryTerm(doc.getPrimaryTerm());
} else {
delete.version(doc.getVersion());
}
return wrap(delete);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,6 @@
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkItemResponse.Failure;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.xcontent.DeprecationHandler;
import org.elasticsearch.index.reindex.ScrollableHitSource.SearchFailure;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.ActionFilters;
Expand All @@ -49,22 +45,26 @@
import org.elasticsearch.client.Client;
import org.elasticsearch.client.ParentTaskAssigningClient;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.DeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.mapper.VersionFieldMapper;
import org.elasticsearch.index.reindex.ScrollableHitSource.SearchFailure;
import org.elasticsearch.index.reindex.remote.RemoteScrollableHitSource;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptService;
Expand Down Expand Up @@ -259,16 +259,13 @@ static class AsyncIndexBySearchAction extends AbstractAsyncBulkByScrollAction<Re
AsyncIndexBySearchAction(BulkByScrollTask task, Logger logger, ParentTaskAssigningClient client,
ThreadPool threadPool, ReindexRequest request, ScriptService scriptService, ClusterState clusterState,
ActionListener<BulkByScrollResponse> listener) {
super(task, logger, client, threadPool, request, scriptService, clusterState, listener);
}

@Override
protected boolean needsSourceDocumentVersions() {
/*
* We only need the source version if we're going to use it when write and we only do that when the destination request uses
* external versioning.
*/
return mainRequest.getDestination().versionType() != VersionType.INTERNAL;
super(task,
/*
* We only need the source version if we're going to use it when write and we only do that when the destination request uses
* external versioning.
*/
request.getDestination().versionType() != VersionType.INTERNAL,
false, logger, client, threadPool, request, scriptService, listener);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.index.reindex;

import org.apache.logging.log4j.Logger;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.ActionFilters;
Expand Down Expand Up @@ -86,19 +87,19 @@ protected void doExecute(UpdateByQueryRequest request, ActionListener<BulkByScro
* Simple implementation of update-by-query using scrolling and bulk.
*/
static class AsyncIndexBySearchAction extends AbstractAsyncBulkByScrollAction<UpdateByQueryRequest> {

private final boolean useSeqNoForCAS;

AsyncIndexBySearchAction(BulkByScrollTask task, Logger logger, ParentTaskAssigningClient client,
ThreadPool threadPool, UpdateByQueryRequest request, ScriptService scriptService, ClusterState clusterState,
ActionListener<BulkByScrollResponse> listener) {
super(task, logger, client, threadPool, request, scriptService, clusterState, listener);
}

@Override
protected boolean needsSourceDocumentVersions() {
/*
* We always need the version of the source document so we can report a version conflict if we try to delete it and it has
* been changed.
*/
return true;
super(task,
// not all nodes support sequence number powered optimistic concurrency control, we fall back to version
clusterState.nodes().getMinNodeVersion().onOrAfter(Version.V_6_7_0) == false,
// all nodes support sequence number powered optimistic concurrency control and we can use it
clusterState.nodes().getMinNodeVersion().onOrAfter(Version.V_6_7_0),
logger, client, threadPool, request, scriptService, listener);
useSeqNoForCAS = clusterState.nodes().getMinNodeVersion().onOrAfter(Version.V_6_7_0);
}

@Override
Expand All @@ -117,8 +118,13 @@ protected RequestWrapper<IndexRequest> buildRequest(ScrollableHitSource.Hit doc)
index.type(doc.getType());
index.id(doc.getId());
index.source(doc.getSource(), doc.getXContentType());
index.versionType(VersionType.INTERNAL);
index.version(doc.getVersion());
if (useSeqNoForCAS) {
index.setIfSeqNo(doc.getSeqNo());
index.setIfPrimaryTerm(doc.getPrimaryTerm());
} else {
index.versionType(VersionType.INTERNAL);
index.version(doc.getVersion());
}
index.setPipeline(mainRequest.getPipeline());
return wrap(index);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,9 @@
package org.elasticsearch.index.reindex.remote;

import org.elasticsearch.Version;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.index.reindex.ScrollableHitSource.BasicHit;
import org.elasticsearch.index.reindex.ScrollableHitSource.Hit;
import org.elasticsearch.index.reindex.ScrollableHitSource.Response;
import org.elasticsearch.index.reindex.ScrollableHitSource.SearchFailure;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.ParsingException;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
Expand All @@ -36,6 +32,10 @@
import org.elasticsearch.common.xcontent.XContentLocation;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.reindex.ScrollableHitSource.BasicHit;
import org.elasticsearch.index.reindex.ScrollableHitSource.Hit;
import org.elasticsearch.index.reindex.ScrollableHitSource.Response;
import org.elasticsearch.index.reindex.ScrollableHitSource.SearchFailure;

import java.io.IOException;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -677,13 +677,8 @@ private void simulateScrollResponse(DummyAsyncBulkByScrollAction action, TimeVal

private class DummyAsyncBulkByScrollAction extends AbstractAsyncBulkByScrollAction<DummyAbstractBulkByScrollRequest> {
DummyAsyncBulkByScrollAction() {
super(testTask, AsyncBulkByScrollActionTests.this.logger, new ParentTaskAssigningClient(client, localNode, testTask),
client.threadPool(), testRequest, null, null, listener);
}

@Override
protected boolean needsSourceDocumentVersions() {
return randomBoolean();
super(testTask, randomBoolean(), randomBoolean(), AsyncBulkByScrollActionTests.this.logger,
new ParentTaskAssigningClient(client, localNode, testTask), client.threadPool(), testRequest, null, listener);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@

package org.elasticsearch.index.reindex;

import org.elasticsearch.index.reindex.ScrollableHitSource.Hit;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.index.reindex.ScrollableHitSource.Hit;

public class UpdateByQueryMetadataTests
extends AbstractAsyncBulkByScrollActionMetadataTestCase<UpdateByQueryRequest, BulkByScrollResponse> {
public void testRoutingIsCopied() throws Exception {
extends AbstractAsyncBulkByScrollActionMetadataTestCase<UpdateByQueryRequest, BulkByScrollResponse> {

public void testRoutingIsCopied() {
IndexRequest index = new IndexRequest();
action().copyMetadata(AbstractAsyncBulkByScrollAction.wrap(index), doc().setRouting("foo"));
assertEquals("foo", index.routing());
Expand All @@ -43,12 +45,12 @@ protected UpdateByQueryRequest request() {
private class TestAction extends TransportUpdateByQueryAction.AsyncIndexBySearchAction {
TestAction() {
super(UpdateByQueryMetadataTests.this.task, UpdateByQueryMetadataTests.this.logger, null,
UpdateByQueryMetadataTests.this.threadPool, request(), null, null, listener());
UpdateByQueryMetadataTests.this.threadPool, request(), null, ClusterState.EMPTY_STATE, listener());
}

@Override
public AbstractAsyncBulkByScrollAction.RequestWrapper<?> copyMetadata(AbstractAsyncBulkByScrollAction.RequestWrapper<?> request,
Hit doc) {
Hit doc) {
return super.copyMetadata(request, doc);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.index.reindex;

import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.script.ScriptService;

import java.util.Date;
Expand Down Expand Up @@ -54,6 +55,6 @@ protected UpdateByQueryRequest request() {
@Override
protected TransportUpdateByQueryAction.AsyncIndexBySearchAction action(ScriptService scriptService, UpdateByQueryRequest request) {
return new TransportUpdateByQueryAction.AsyncIndexBySearchAction(task, logger, null, threadPool, request, scriptService,
null, listener());
ClusterState.EMPTY_STATE, listener());
}
}
Loading

0 comments on commit 8270021

Please sign in to comment.