Skip to content

Commit

Permalink
Rename seq# powered optimistic concurrency control parameters to ifSe…
Browse files Browse the repository at this point in the history
…qNo/ifPrimaryTerm (#36757)

This PR renames the parameters previously introduce to the following:

### URL Parameters
```
PUT twitter/_doc/1?if_seq_no=501&if_primary_term=1
{
    "user" : "kimchy",
    "post_date" : "2009-11-15T14:12:12",
    "message" : "trying out Elasticsearch"
}

DELETE twitter/_doc/1?if_seq_no=501&if_primary_term=1
```

### Bulk API
```
POST _bulk
{ "index" : { "_index" : "test", "_type" : "_doc", "_id" : "1", "if_seq_no": 501, "if_primary_term": 1 } }
{ "field1" : "value1" }
{ "delete" : { "_index" : "test", "_type" : "_doc", "_id" : "2", "if_seq_no": 501, "if_primary_term": 1 } }
```

### Java API
```
IndexRequest.ifSeqNo(long seqNo)
IndexRequest.ifPrimaryTerm(long primaryTerm)
DeleteRequest.ifSeqNo(long seqNo)
DeleteRequest.ifPrimaryTerm(long primaryTerm)
```

Relates #36148
Relates #10708
  • Loading branch information
bleskes committed Dec 18, 2018
1 parent a8a7e6e commit 1db1565
Show file tree
Hide file tree
Showing 16 changed files with 248 additions and 157 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@
"type" : "time",
"description" : "Explicit operation timeout"
},
"if_seq_no_match" : {
"if_seq_no" : {
"type" : "number",
"description" : "only perform the delete operation if the last operation that has changed the document has the specified sequence number"
},
"if_primary_term_match" : {
"if_primary_term" : {
"type" : "number",
"description" : "only perform the delete operation if the last operation that has changed the document has the specified primary term"
},
Expand Down
4 changes: 2 additions & 2 deletions rest-api-spec/src/main/resources/rest-api-spec/api/index.json
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,11 @@
"options" : ["internal", "external", "external_gte", "force"],
"description" : "Specific version type"
},
"if_seq_no_match" : {
"if_seq_no" : {
"type" : "number",
"description" : "only perform the index operation if the last operation that has changed the document has the specified sequence number"
},
"if_primary_term_match" : {
"if_primary_term" : {
"type" : "number",
"description" : "only perform the index operation if the last operation that has changed the document has the specified primary term"
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
- do:
index:
index: test_1
type: type
id: 1
body: { foo: bar }
- match: { _version: 1}
Expand All @@ -17,6 +18,7 @@
- do:
get:
index: test_1
type: type
id: 1
- match: { _seq_no: $seqno }
- match: { _primary_term: $primary_term }
Expand All @@ -25,26 +27,29 @@
catch: conflict
index:
index: test_1
type: type
id: 1
if_seq_no_match: 10000
if_primary_term_match: $primary_term
if_seq_no: 10000
if_primary_term: $primary_term
body: { foo: bar2 }

- do:
catch: conflict
index:
index: test_1
type: type
id: 1
if_seq_no_match: $seqno
if_primary_term_match: 1000
if_seq_no: $seqno
if_primary_term: 1000
body: { foo: bar2 }

- do:
index:
index: test_1
type: type
id: 1
if_seq_no_match: $seqno
if_primary_term_match: $primary_term
if_seq_no: $seqno
if_primary_term: $primary_term
body: { foo: bar2 }

- match: { _version: 2 }
25 changes: 13 additions & 12 deletions server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques
private static final ParseField PIPELINE = new ParseField("pipeline");
private static final ParseField FIELDS = new ParseField("fields");
private static final ParseField SOURCE = new ParseField("_source");
private static final ParseField IF_SEQ_NO_MATCH = new ParseField("if_seq_no_match");
private static final ParseField IF_PRIMARY_TERM_MATCH = new ParseField("if_primary_term_match");
private static final ParseField IF_SEQ_NO = new ParseField("if_seq_no");
private static final ParseField IF_PRIMARY_TERM = new ParseField("if_primary_term");

/**
* Requests that are part of this request. It is only possible to add things that are both {@link ActionRequest}s and
Expand Down Expand Up @@ -357,8 +357,8 @@ public BulkRequest add(BytesReference data, @Nullable String defaultIndex, @Null
String opType = null;
long version = Versions.MATCH_ANY;
VersionType versionType = VersionType.INTERNAL;
long ifSeqNoMatch = SequenceNumbers.UNASSIGNED_SEQ_NO;
long ifPrimaryTermMatch = 0;
long ifSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
long ifPrimaryTerm = 0;
int retryOnConflict = 0;
String pipeline = valueOrDefault(defaultPipeline, globalPipeline);

Expand Down Expand Up @@ -391,10 +391,10 @@ public BulkRequest add(BytesReference data, @Nullable String defaultIndex, @Null
version = parser.longValue();
} else if (VERSION_TYPE.match(currentFieldName, parser.getDeprecationHandler())) {
versionType = VersionType.fromString(parser.text());
} else if (IF_SEQ_NO_MATCH.match(currentFieldName, parser.getDeprecationHandler())) {
ifSeqNoMatch = parser.longValue();
} else if (IF_PRIMARY_TERM_MATCH.match(currentFieldName, parser.getDeprecationHandler())) {
ifPrimaryTermMatch = parser.longValue();
} else if (IF_SEQ_NO.match(currentFieldName, parser.getDeprecationHandler())) {
ifSeqNo = parser.longValue();
} else if (IF_PRIMARY_TERM.match(currentFieldName, parser.getDeprecationHandler())) {
ifPrimaryTerm = parser.longValue();
} else if (RETRY_ON_CONFLICT.match(currentFieldName, parser.getDeprecationHandler())) {
retryOnConflict = parser.intValue();
} else if (PIPELINE.match(currentFieldName, parser.getDeprecationHandler())) {
Expand Down Expand Up @@ -432,7 +432,7 @@ public BulkRequest add(BytesReference data, @Nullable String defaultIndex, @Null

if ("delete".equals(action)) {
add(new DeleteRequest(index, type, id).routing(routing).parent(parent).version(version)
.versionType(versionType).setIfMatch(ifSeqNoMatch, ifPrimaryTermMatch), payload);
.versionType(versionType).setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm), payload);
} else {
nextMarker = findNextMarker(marker, from, data, length);
if (nextMarker == -1) {
Expand All @@ -445,17 +445,18 @@ public BulkRequest add(BytesReference data, @Nullable String defaultIndex, @Null
if ("index".equals(action)) {
if (opType == null) {
internalAdd(new IndexRequest(index, type, id).routing(routing).parent(parent).version(version)
.versionType(versionType).setPipeline(pipeline).ifMatch(ifSeqNoMatch, ifPrimaryTermMatch)
.versionType(versionType).setPipeline(pipeline).setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm)
.source(sliceTrimmingCarriageReturn(data, from, nextMarker,xContentType), xContentType), payload);
} else {
internalAdd(new IndexRequest(index, type, id).routing(routing).parent(parent).version(version)
.versionType(versionType).create("create".equals(opType)).setPipeline(pipeline)
.ifMatch(ifSeqNoMatch, ifPrimaryTermMatch)
.setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm)
.source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType), payload);
}
} else if ("create".equals(action)) {
internalAdd(new IndexRequest(index, type, id).routing(routing).parent(parent).version(version)
.versionType(versionType).create(true).setPipeline(pipeline).ifMatch(ifSeqNoMatch, ifPrimaryTermMatch)
.versionType(versionType).create(true).setPipeline(pipeline)
.setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm)
.source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType), payload);
} else if ("update".equals(action)) {
UpdateRequest updateRequest = new UpdateRequest(index, type, id).routing(routing).parent(parent)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ private static void executeIndexRequestOnPrimary(BulkPrimaryExecutionContext con
executeOnPrimaryWhileHandlingMappingUpdates(context,
() ->
primary.applyIndexOperationOnPrimary(request.version(), request.versionType(), sourceToParse,
request.ifSeqNoMatch(), request.ifPrimaryTermMatch(), request.getAutoGeneratedTimestamp(), request.isRetry()),
request.ifSeqNo(), request.ifPrimaryTerm(), request.getAutoGeneratedTimestamp(), request.isRetry()),
e -> primary.getFailedIndexResult(e, request.version()),
context::markOperationAsExecuted,
mapping -> mappingUpdater.updateMappings(mapping, primary.shardId(), request.type()));
Expand All @@ -473,7 +473,7 @@ private static void executeDeleteRequestOnPrimary(BulkPrimaryExecutionContext co
final IndexShard primary = context.getPrimary();
executeOnPrimaryWhileHandlingMappingUpdates(context,
() -> primary.applyDeleteOperationOnPrimary(request.version(), request.type(), request.id(), request.versionType(),
request.ifSeqNoMatch(), request.ifPrimaryTermMatch()),
request.ifSeqNo(), request.ifPrimaryTerm()),
e -> primary.getFailedDeleteResult(e, request.version()),
context::markOperationAsExecuted,
mapping -> mappingUpdater.updateMappings(mapping, primary.shardId(), request.type()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ public class DeleteRequest extends ReplicatedWriteRequest<DeleteRequest>
private String parent;
private long version = Versions.MATCH_ANY;
private VersionType versionType = VersionType.INTERNAL;
private long ifSeqNoMatch = SequenceNumbers.UNASSIGNED_SEQ_NO;
private long ifPrimaryTermMatch = 0;
private long ifSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
private long ifPrimaryTerm = 0;

public DeleteRequest() {
}
Expand Down Expand Up @@ -103,11 +103,20 @@ public ActionRequestValidationException validate() {
validationException = addValidationError("version type [force] may no longer be used", validationException);
}

if (ifSeqNoMatch != SequenceNumbers.UNASSIGNED_SEQ_NO && (
if (ifSeqNo != SequenceNumbers.UNASSIGNED_SEQ_NO && (
versionType != VersionType.INTERNAL || version != Versions.MATCH_ANY
)) {
validationException = addValidationError("compare and write operations can not use versioning", validationException);
}

if (ifPrimaryTerm == 0 && ifSeqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) {
validationException = addValidationError("ifSeqNo is set, but primary term is [0]", validationException);
}
if (ifPrimaryTerm != 0 && ifSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO) {
validationException =
addValidationError("ifSeqNo is unassigned, but primary term is [" + ifPrimaryTerm + "]", validationException);
}

return validationException;
}

Expand Down Expand Up @@ -200,29 +209,52 @@ public DeleteRequest versionType(VersionType versionType) {
return this;
}

public long ifSeqNoMatch() {
return ifSeqNoMatch;
/**
* If set, only perform this delete request if the document was last modification was assigned this sequence number.
* If the document last modification was assigned a different sequence number a
* {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown.
*/
public long ifSeqNo() {
return ifSeqNo;
}

public long ifPrimaryTermMatch() {
return ifPrimaryTermMatch;
/**
* If set, only perform this delete request if the document was last modification was assigned this primary term.
*
* If the document last modification was assigned a different term a
* {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown.
*/
public long ifPrimaryTerm() {
return ifPrimaryTerm;
}

public DeleteRequest setIfMatch(long seqNo, long term) {
if (term == 0 && seqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) {
throw new IllegalArgumentException("seqNo is set, but primary term is [0]");
}
if (term != 0 && seqNo == SequenceNumbers.UNASSIGNED_SEQ_NO) {
throw new IllegalArgumentException("seqNo is unassigned, but primary term is [" + term + "]");
}
/**
* only perform this delete request if the document was last modification was assigned the given
* sequence number. Must be used in combination with {@link #setIfPrimaryTerm(long)}
*
* If the document last modification was assigned a different sequence number a
* {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown.
*/
public DeleteRequest setIfSeqNo(long seqNo) {
if (seqNo < 0 && seqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) {
throw new IllegalArgumentException("sequence numbers must be non negative. got [" + seqNo + "].");
}
ifSeqNo = seqNo;
return this;
}

/**
* only perform this delete request if the document was last modification was assigned the given
* primary term. Must be used in combination with {@link #setIfSeqNo(long)}
*
* If the document last modification was assigned a different primary term a
* {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown.
*/
public DeleteRequest setIfPrimaryTerm(long term) {
if (term < 0) {
throw new IllegalArgumentException("primary term must be non negative. got [" + term + "]");
}
ifSeqNoMatch = seqNo;
ifPrimaryTermMatch = term;
ifPrimaryTerm = term;
return this;
}

Expand All @@ -246,11 +278,11 @@ public void readFrom(StreamInput in) throws IOException {
version = in.readLong();
versionType = VersionType.fromValue(in.readByte());
if (in.getVersion().onOrAfter(Version.V_6_0_0)) {
ifSeqNoMatch = in.readZLong();
ifPrimaryTermMatch = in.readVLong();
ifSeqNo = in.readZLong();
ifPrimaryTerm = in.readVLong();
} else {
ifSeqNoMatch = SequenceNumbers.UNASSIGNED_SEQ_NO;
ifPrimaryTermMatch = 0;
ifSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
ifPrimaryTerm = 0;
}
}

Expand All @@ -264,10 +296,10 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeLong(version);
out.writeByte(versionType.getValue());
if (out.getVersion().onOrAfter(Version.V_6_0_0)) {
out.writeZLong(ifSeqNoMatch);
out.writeVLong(ifPrimaryTermMatch);
} else if (ifSeqNoMatch != SequenceNumbers.UNASSIGNED_SEQ_NO || ifPrimaryTermMatch != 0) {
assert false : "setIfMatch [" + ifSeqNoMatch + "], currentDocTem [" + ifPrimaryTermMatch + "]";
out.writeZLong(ifSeqNo);
out.writeVLong(ifPrimaryTerm);
} else if (ifSeqNo != SequenceNumbers.UNASSIGNED_SEQ_NO || ifPrimaryTerm != 0) {
assert false : "setIfMatch [" + ifSeqNo + "], currentDocTem [" + ifPrimaryTerm + "]";
throw new IllegalStateException(
"sequence number based compare and write is not supported until all nodes are on version 7.0 or higher. " +
"Stream version [" + out.getVersion() + "]");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,26 @@ public DeleteRequestBuilder setVersionType(VersionType versionType) {
}

/**
* only performs this delete request if the document was last modification was assigned the given
* sequence number and primary term
* only perform this delete request if the document was last modification was assigned the given
* sequence number. Must be used in combination with {@link #setIfPrimaryTerm(long)}
*
* If the document last modification was assigned a different sequence number a
* {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown.
*/
public DeleteRequestBuilder setIfMatch(long seqNo, long term) {
request.setIfMatch(seqNo, term);
public DeleteRequestBuilder setIfSeqNo(long seqNo) {
request.setIfSeqNo(seqNo);
return this;
}

/**
* only perform this delete request if the document was last modification was assigned the given
* primary term. Must be used in combination with {@link #setIfSeqNo(long)}
*
* If the document last modification was assigned a different term a
* {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown.
*/
public DeleteRequestBuilder setIfPrimaryTerm(long term) {
request.setIfPrimaryTerm(term);
return this;
}

Expand Down
Loading

0 comments on commit 1db1565

Please sign in to comment.