Skip to content

Commit

Permalink
6.x Backport of sequence number powered CAS PRs: #37977, #37857 and #…
Browse files Browse the repository at this point in the history
…37872 (#38155)

* Move update and delete by query to use seq# for optimistic concurrency control (#37857)

The delete and update by query APIs both offer protection against overriding concurrent user changes to the documents they touch. They currently are using internal versioning. This PR changes that to rely on sequences numbers and primary terms.

Relates #37639
Relates #36148
Relates #10708

* Add Seq# based optimistic concurrency control to UpdateRequest (#37872)

The update request has a lesser known support for a one off update of a known document version. This PR adds an a seq# based alternative to power these operations.

Relates #36148
Relates #10708

* Move watcher to use seq# and primary term for concurrency control (#37977)

* Adapt minimum versions for seq# power operations

After backporting #37977, #37857 and #37872
  • Loading branch information
bleskes authored Feb 1, 2019
1 parent 2038221 commit c7ef318
Show file tree
Hide file tree
Showing 76 changed files with 1,321 additions and 288 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.index.reindex.ReindexRequest;
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.action.search.RestSearchAction;
import org.elasticsearch.script.mustache.MultiSearchTemplateRequest;
Expand Down Expand Up @@ -882,6 +883,24 @@ Params withWaitForActiveShards(ActiveShardCount currentActiveShardCount, ActiveS
return this;
}

Params withIfSeqNo(long ifSeqNo) {
if (ifSeqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) {
return putParam("if_seq_no", Long.toString(ifSeqNo));
}
return this;
}

Params withIfPrimaryTerm(long ifPrimaryTerm) {
if (ifPrimaryTerm != SequenceNumbers.UNASSIGNED_PRIMARY_TERM) {
return putParam("if_primary_term", Long.toString(ifPrimaryTerm));
}
return this;
}

Params withWaitForActiveShards(ActiveShardCount activeShardCount) {
return withWaitForActiveShards(activeShardCount, ActiveShardCount.DEFAULT);
}

Params withIndicesOptions(IndicesOptions indicesOptions) {
if (indicesOptions != null) {
withIgnoreUnavailable(indicesOptions.ignoreUnavailable());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,10 @@ static Request putWatch(PutWatchRequest putWatchRequest) {
.build();

Request request = new Request(HttpPut.METHOD_NAME, endpoint);
RequestConverters.Params params = new RequestConverters.Params(request).withVersion(putWatchRequest.getVersion());
RequestConverters.Params params = new RequestConverters.Params(request)
.withVersion(putWatchRequest.getVersion())
.withIfSeqNo(putWatchRequest.ifSeqNo())
.withIfPrimaryTerm(putWatchRequest.ifPrimaryTerm());
if (putWatchRequest.isActive() == false) {
params.putParam("active", "false");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,14 @@
import java.util.Map;
import java.util.Objects;

import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;

public class GetWatchResponse {
private final String id;
private final long version;
private final long seqNo;
private final long primaryTerm;
private final WatchStatus status;

private final BytesReference source;
Expand All @@ -43,15 +48,18 @@ public class GetWatchResponse {
* Ctor for missing watch
*/
public GetWatchResponse(String id) {
this(id, Versions.NOT_FOUND, null, null, null);
this(id, Versions.NOT_FOUND, UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM, null, null, null);
}

public GetWatchResponse(String id, long version, WatchStatus status, BytesReference source, XContentType xContentType) {
public GetWatchResponse(String id, long version, long seqNo, long primaryTerm, WatchStatus status,
BytesReference source, XContentType xContentType) {
this.id = id;
this.version = version;
this.status = status;
this.source = source;
this.xContentType = xContentType;
this.seqNo = seqNo;
this.primaryTerm = primaryTerm;
}

public String getId() {
Expand All @@ -62,6 +70,14 @@ public long getVersion() {
return version;
}

public long getSeqNo() {
return seqNo;
}

public long getPrimaryTerm() {
return primaryTerm;
}

public boolean isFound() {
return version != Versions.NOT_FOUND;
}
Expand Down Expand Up @@ -111,6 +127,8 @@ public int hashCode() {
private static final ParseField ID_FIELD = new ParseField("_id");
private static final ParseField FOUND_FIELD = new ParseField("found");
private static final ParseField VERSION_FIELD = new ParseField("_version");
private static final ParseField SEQ_NO_FIELD = new ParseField("_seq_no");
private static final ParseField PRIMARY_TERM_FIELD = new ParseField("_primary_term");
private static final ParseField STATUS_FIELD = new ParseField("status");
private static final ParseField WATCH_FIELD = new ParseField("watch");

Expand All @@ -119,9 +137,10 @@ public int hashCode() {
a -> {
boolean isFound = (boolean) a[1];
if (isFound) {
XContentBuilder builder = (XContentBuilder) a[4];
XContentBuilder builder = (XContentBuilder) a[6];
BytesReference source = BytesReference.bytes(builder);
return new GetWatchResponse((String) a[0], (long) a[2], (WatchStatus) a[3], source, builder.contentType());
return new GetWatchResponse((String) a[0], (long) a[2], (long) a[3], (long) a[4], (WatchStatus) a[5],
source, builder.contentType());
} else {
return new GetWatchResponse((String) a[0]);
}
Expand All @@ -131,6 +150,8 @@ public int hashCode() {
PARSER.declareString(ConstructingObjectParser.constructorArg(), ID_FIELD);
PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), FOUND_FIELD);
PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), VERSION_FIELD);
PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), SEQ_NO_FIELD);
PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), PRIMARY_TERM_FIELD);
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(),
(parser, context) -> WatchStatus.parse(parser), STATUS_FIELD);
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,14 @@
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.seqno.SequenceNumbers;

import java.util.Objects;
import java.util.regex.Pattern;

import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;

/**
* This request class contains the data needed to create a watch along with the name of the watch.
* The name of the watch will become the ID of the indexed document.
Expand All @@ -42,6 +46,9 @@ public final class PutWatchRequest implements Validatable {
private final XContentType xContentType;
private boolean active = true;
private long version = Versions.MATCH_ANY;
private long ifSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
private long ifPrimaryTerm = UNASSIGNED_PRIMARY_TERM;


public PutWatchRequest(String id, BytesReference source, XContentType xContentType) {
Objects.requireNonNull(id, "watch id is missing");
Expand Down Expand Up @@ -98,6 +105,56 @@ public void setVersion(long version) {
this.version = version;
}

/**
* only performs this put request if the watch's last modification was assigned the given
* sequence number. Must be used in combination with {@link #setIfPrimaryTerm(long)}
*
* If the watch's last modification was assigned a different sequence number a
* {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown.
*/
public PutWatchRequest setIfSeqNo(long seqNo) {
if (seqNo < 0 && seqNo != UNASSIGNED_SEQ_NO) {
throw new IllegalArgumentException("sequence numbers must be non negative. got [" + seqNo + "].");
}
ifSeqNo = seqNo;
return this;
}

/**
* only performs this put request if the watch's last modification was assigned the given
* primary term. Must be used in combination with {@link #setIfSeqNo(long)}
*
* If the watch last modification was assigned a different term a
* {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown.
*/
public PutWatchRequest setIfPrimaryTerm(long term) {
if (term < 0) {
throw new IllegalArgumentException("primary term must be non negative. got [" + term + "]");
}
ifPrimaryTerm = term;
return this;
}

/**
* If set, only perform this put watch request if the watch's last modification was assigned this sequence number.
* If the watch last last modification was assigned a different sequence number a
* {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown.
*/
public long ifSeqNo() {
return ifSeqNo;
}

/**
* If set, only perform this put watch request if the watch's last modification was assigned this primary term.
*
* If the watch's last modification was assigned a different term a
* {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown.
*/
public long ifPrimaryTerm() {
return ifPrimaryTerm;
}


public static boolean isValidId(String id) {
return Strings.isEmpty(id) == false && NO_WS_PATTERN.matcher(id).matches();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.seqno.SequenceNumbers;

import java.io.IOException;
import java.util.Objects;
Expand All @@ -32,20 +33,26 @@ public class PutWatchResponse {

static {
PARSER.declareString(PutWatchResponse::setId, new ParseField("_id"));
PARSER.declareLong(PutWatchResponse::setSeqNo, new ParseField("_seq_no"));
PARSER.declareLong(PutWatchResponse::setPrimaryTerm, new ParseField("_primary_term"));
PARSER.declareLong(PutWatchResponse::setVersion, new ParseField("_version"));
PARSER.declareBoolean(PutWatchResponse::setCreated, new ParseField("created"));
}

private String id;
private long version;
private long seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
private long primaryTerm = SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
private boolean created;

public PutWatchResponse() {
}

public PutWatchResponse(String id, long version, boolean created) {
public PutWatchResponse(String id, long version, long seqNo, long primaryTerm, boolean created) {
this.id = id;
this.version = version;
this.seqNo = seqNo;
this.primaryTerm = primaryTerm;
this.created = created;
}

Expand All @@ -57,6 +64,14 @@ private void setVersion(long version) {
this.version = version;
}

private void setSeqNo(long seqNo) {
this.seqNo = seqNo;
}

private void setPrimaryTerm(long primaryTerm) {
this.primaryTerm = primaryTerm;
}

private void setCreated(boolean created) {
this.created = created;
}
Expand All @@ -69,6 +84,14 @@ public long getVersion() {
return version;
}

public long getSeqNo() {
return seqNo;
}

public long getPrimaryTerm() {
return primaryTerm;
}

public boolean isCreated() {
return created;
}
Expand All @@ -80,12 +103,14 @@ public boolean equals(Object o) {

PutWatchResponse that = (PutWatchResponse) o;

return Objects.equals(id, that.id) && Objects.equals(version, that.version) && Objects.equals(created, that.created);
return Objects.equals(id, that.id) && Objects.equals(version, that.version)
&& Objects.equals(seqNo, that.seqNo)
&& Objects.equals(primaryTerm, that.primaryTerm) && Objects.equals(created, that.created);
}

@Override
public int hashCode() {
return Objects.hash(id, version, created);
return Objects.hash(id, version, seqNo, primaryTerm, created);
}

public static PutWatchResponse fromXContent(XContentParser parser) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,10 @@
import java.util.concurrent.atomic.AtomicReference;

import static java.util.Collections.singletonMap;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.lessThan;
Expand Down Expand Up @@ -547,23 +549,45 @@ public void testUpdate() throws IOException {
IndexResponse indexResponse = highLevelClient().index(indexRequest, RequestOptions.DEFAULT);
assertEquals(RestStatus.CREATED, indexResponse.status());

UpdateRequest updateRequest = new UpdateRequest("index", "type", "id");
updateRequest.doc(singletonMap("field", "updated"), randomFrom(XContentType.values()));

UpdateResponse updateResponse = execute(updateRequest, highLevelClient()::update, highLevelClient()::updateAsync);
assertEquals(RestStatus.OK, updateResponse.status());
assertEquals(indexResponse.getVersion() + 1, updateResponse.getVersion());

UpdateRequest updateRequestConflict = new UpdateRequest("index", "type", "id");
updateRequestConflict.doc(singletonMap("field", "with_version_conflict"), randomFrom(XContentType.values()));
updateRequestConflict.version(indexResponse.getVersion());
long lastUpdateSeqNo;
long lastUpdatePrimaryTerm;
{
UpdateRequest updateRequest = new UpdateRequest("index", "type", "id");
updateRequest.doc(singletonMap("field", "updated"), randomFrom(XContentType.values()));
final UpdateResponse updateResponse = execute(updateRequest, highLevelClient()::update, highLevelClient()::updateAsync);
assertEquals(RestStatus.OK, updateResponse.status());
assertEquals(indexResponse.getVersion() + 1, updateResponse.getVersion());
lastUpdateSeqNo = updateResponse.getSeqNo();
lastUpdatePrimaryTerm = updateResponse.getPrimaryTerm();
assertThat(lastUpdateSeqNo, greaterThanOrEqualTo(0L));
assertThat(lastUpdatePrimaryTerm, greaterThanOrEqualTo(1L));
}

ElasticsearchStatusException exception = expectThrows(ElasticsearchStatusException.class, () ->
execute(updateRequestConflict, highLevelClient()::update, highLevelClient()::updateAsync,
highLevelClient()::update, highLevelClient()::updateAsync));
assertEquals(RestStatus.CONFLICT, exception.status());
assertEquals("Elasticsearch exception [type=version_conflict_engine_exception, reason=[type][id]: version conflict, " +
"current version [2] is different than the one provided [1]]", exception.getMessage());
{
final UpdateRequest updateRequest = new UpdateRequest("index", "type", "id");
updateRequest.doc(singletonMap("field", "with_seq_no_conflict"), randomFrom(XContentType.values()));
if (randomBoolean()) {
updateRequest.setIfSeqNo(lastUpdateSeqNo + 1);
updateRequest.setIfPrimaryTerm(lastUpdatePrimaryTerm);
} else {
updateRequest.setIfSeqNo(lastUpdateSeqNo + (randomBoolean() ? 0 : 1));
updateRequest.setIfPrimaryTerm(lastUpdatePrimaryTerm + 1);
}
ElasticsearchStatusException exception = expectThrows(ElasticsearchStatusException.class, () ->
execute(updateRequest, highLevelClient()::update, highLevelClient()::updateAsync));
assertEquals(exception.toString(),RestStatus.CONFLICT, exception.status());
assertThat(exception.getMessage(), containsString("Elasticsearch exception [type=version_conflict_engine_exception"));
}
{
final UpdateRequest updateRequest = new UpdateRequest("index", "type", "id");
updateRequest.doc(singletonMap("field", "with_seq_no"), randomFrom(XContentType.values()));
updateRequest.setIfSeqNo(lastUpdateSeqNo);
updateRequest.setIfPrimaryTerm(lastUpdatePrimaryTerm);
final UpdateResponse updateResponse = execute(updateRequest, highLevelClient()::update, highLevelClient()::updateAsync);
assertEquals(RestStatus.OK, updateResponse.status());
assertEquals(lastUpdateSeqNo + 1, updateResponse.getSeqNo());
assertEquals(lastUpdatePrimaryTerm, updateResponse.getPrimaryTerm());
}
}
{
ElasticsearchStatusException exception = expectThrows(ElasticsearchStatusException.class, () -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,18 @@ private static XContentBuilder toXContent(PutWatchResponse response, XContentBui
return builder.startObject()
.field("_id", response.getId())
.field("_version", response.getVersion())
.field("_seq_no", response.getSeqNo())
.field("_primary_term", response.getPrimaryTerm())
.field("created", response.isCreated())
.endObject();
}

private static PutWatchResponse createTestInstance() {
String id = randomAlphaOfLength(10);
long seqNo = randomNonNegativeLong();
long primaryTerm = randomLongBetween(1, 200);
long version = randomLongBetween(1, 10);
boolean created = randomBoolean();
return new PutWatchResponse(id, version, created);
return new PutWatchResponse(id, version, seqNo, primaryTerm, created);
}
}
2 changes: 1 addition & 1 deletion docs/reference/docs/delete.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ The result of the above delete operation is:
[[optimistic-concurrency-control-delete]]
=== Optimistic concurrency control

Delete operations can be made optional and only be performed if the last
Delete operations can be made conditional and only be performed if the last
modification to the document was assigned the sequence number and primary
term specified by the `if_seq_no` and `if_primary_term` parameters. If a
mismatch is detected, the operation will result in a `VersionConflictException`
Expand Down
Loading

0 comments on commit c7ef318

Please sign in to comment.