Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove versionType from translog #31945

Merged
merged 14 commits into from
Jul 18, 2018
Original file line number Diff line number Diff line change
Expand Up @@ -523,13 +523,12 @@ private static Engine.Result performOpOnReplica(DocWriteResponse primaryResponse
indexRequest.type(), indexRequest.id(), indexRequest.source(), indexRequest.getContentType())
.routing(indexRequest.routing());
result = replica.applyIndexOperationOnReplica(primaryResponse.getSeqNo(), primaryResponse.getVersion(),
indexRequest.versionType().versionTypeForReplicationAndRecovery(), indexRequest.getAutoGeneratedTimestamp(),
indexRequest.isRetry(), sourceToParse);
indexRequest.getAutoGeneratedTimestamp(), indexRequest.isRetry(), sourceToParse);
break;
case DELETE:
DeleteRequest deleteRequest = (DeleteRequest) docWriteRequest;
result = replica.applyDeleteOperationOnReplica(primaryResponse.getSeqNo(), primaryResponse.getVersion(),
deleteRequest.type(), deleteRequest.id(), deleteRequest.versionType().versionTypeForReplicationAndRecovery());
deleteRequest.type(), deleteRequest.id());
break;
default:
throw new IllegalStateException("Unexpected request operation type on replica: "
Expand Down
15 changes: 0 additions & 15 deletions server/src/main/java/org/elasticsearch/index/VersionType.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,6 @@ public boolean validateVersionForReads(long version) {
// not allowing Versions.NOT_FOUND as it is not a valid input value.
return version > 0L || version == Versions.MATCH_ANY;
}

@Override
public VersionType versionTypeForReplicationAndRecovery() {
// replicas get the version from the primary after increment. The same version is stored in
// the transaction log. -> the should use the external semantics.
return EXTERNAL;
}
},
EXTERNAL((byte) 1) {
@Override
Expand Down Expand Up @@ -333,14 +326,6 @@ public byte getValue() {
*/
public abstract boolean validateVersionForReads(long version);

/**
* Some version types require different semantics for primary and replicas. This version allows
* the type to override the default behavior.
*/
public VersionType versionTypeForReplicationAndRecovery() {
return this;
}

public static VersionType fromString(String versionType) {
if ("internal".equals(versionType)) {
return INTERNAL;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -704,20 +704,6 @@ private boolean canOptimizeAddDocument(Index index) {
return false;
}

private boolean assertVersionType(final Engine.Operation operation) {
if (operation.origin() == Operation.Origin.REPLICA ||
operation.origin() == Operation.Origin.PEER_RECOVERY ||
operation.origin() == Operation.Origin.LOCAL_TRANSLOG_RECOVERY) {
// ensure that replica operation has expected version type for replication
// ensure that versionTypeForReplicationAndRecovery is idempotent
assert operation.versionType() == operation.versionType().versionTypeForReplicationAndRecovery()
: "unexpected version type in request from [" + operation.origin().name() + "] " +
"found [" + operation.versionType().name() + "] " +
"expected [" + operation.versionType().versionTypeForReplicationAndRecovery().name() + "]";
}
return true;
}

private boolean assertIncomingSequenceNumber(final Engine.Operation.Origin origin, final long seqNo) {
if (origin == Operation.Origin.PRIMARY) {
assert assertOriginPrimarySequenceNumber(seqNo);
Expand Down Expand Up @@ -757,7 +743,6 @@ public IndexResult index(Index index) throws IOException {
try (ReleasableLock releasableLock = readLock.acquire()) {
ensureOpen();
assert assertIncomingSequenceNumber(index.origin(), index.seqNo());
assert assertVersionType(index);
try (Releasable ignored = versionMap.acquireLock(index.uid().bytes());
Releasable indexThrottle = doThrottle ? () -> {} : throttle.acquireThrottle()) {
lastWriteNanos = index.startTime();
Expand Down Expand Up @@ -860,9 +845,6 @@ private IndexingStrategy planIndexingAsNonPrimary(Index index) throws IOExceptio
"max_seqno non-append-only [" + maxSeqNoOfNonAppendOnlyOperations.get() + "], seqno of index [" + index.seqNo() + "]";
}
versionMap.enforceSafeAccess();
// drop out of order operations
assert index.versionType().versionTypeForReplicationAndRecovery() == index.versionType() :
"resolving out of order delivery based on versioning but version type isn't fit for it. got [" + index.versionType() + "]";
// unlike the primary, replicas don't really care to about creation status of documents
// this allows to ignore the case where a document was found in the live version maps in
// a delete state and return false for the created flag in favor of code simplicity
Expand Down Expand Up @@ -1096,7 +1078,6 @@ private void updateDocs(final Term uid, final List<ParseContext.Document> docs,
public DeleteResult delete(Delete delete) throws IOException {
versionMap.enforceSafeAccess();
assert Objects.equals(delete.uid().field(), IdFieldMapper.NAME) : delete.uid().field();
assert assertVersionType(delete);
assert assertIncomingSequenceNumber(delete.origin(), delete.seqNo());
final DeleteResult deleteResult;
// NOTE: we don't throttle this when merges fall behind because delete-by-id does not create new segments:
Expand Down Expand Up @@ -1149,10 +1130,6 @@ public DeleteResult delete(Delete delete) throws IOException {

private DeletionStrategy planDeletionAsNonPrimary(Delete delete) throws IOException {
assert delete.origin() != Operation.Origin.PRIMARY : "planing as primary but got " + delete.origin();
// drop out of order operations
assert delete.versionType().versionTypeForReplicationAndRecovery() == delete.versionType() :
"resolving out of order delivery based on versioning but version type isn't fit for it. got ["
+ delete.versionType() + "]";
maxSeqNoOfNonAppendOnlyOperations.updateAndGet(curr -> Math.max(delete.seqNo(), curr));
assert maxSeqNoOfNonAppendOnlyOperations.get() >= delete.seqNo() : "max_seqno of non-append-only was not updated;" +
"max_seqno non-append-only [" + maxSeqNoOfNonAppendOnlyOperations.get() + "], seqno of delete [" + delete.seqNo() + "]";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -649,10 +649,10 @@ public Engine.IndexResult applyIndexOperationOnPrimary(long version, VersionType
isRetry, Engine.Operation.Origin.PRIMARY, sourceToParse);
}

public Engine.IndexResult applyIndexOperationOnReplica(long seqNo, long version, VersionType versionType,
long autoGeneratedTimeStamp, boolean isRetry, SourceToParse sourceToParse)
public Engine.IndexResult applyIndexOperationOnReplica(long seqNo, long version, long autoGeneratedTimeStamp,
boolean isRetry, SourceToParse sourceToParse)
throws IOException {
return applyIndexOperation(seqNo, primaryTerm, version, versionType, autoGeneratedTimeStamp, isRetry,
return applyIndexOperation(seqNo, primaryTerm, version, VersionType.EXTERNAL, autoGeneratedTimeStamp, isRetry,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe it's better to pass null instead of VersionType.EXTERNAL? this way people may think it's the original one?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we do that, maybe assert in the Engine.Op constructor that if the origin is not a primary, version type is null?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on this suggestion

Engine.Operation.Origin.REPLICA, sourceToParse);
}

Expand Down Expand Up @@ -740,9 +740,8 @@ public Engine.DeleteResult applyDeleteOperationOnPrimary(long version, String ty
Engine.Operation.Origin.PRIMARY);
}

public Engine.DeleteResult applyDeleteOperationOnReplica(long seqNo, long version, String type, String id,
VersionType versionType) throws IOException {
return applyDeleteOperation(seqNo, primaryTerm, version, type, id, versionType, Engine.Operation.Origin.REPLICA);
public Engine.DeleteResult applyDeleteOperationOnReplica(long seqNo, long version, String type, String id) throws IOException {
return applyDeleteOperation(seqNo, primaryTerm, version, type, id, VersionType.EXTERNAL, Engine.Operation.Origin.REPLICA);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some comment as before

}

private Engine.DeleteResult applyDeleteOperation(long seqNo, long opPrimaryTerm, long version, String type, String id,
Expand Down Expand Up @@ -1211,14 +1210,14 @@ public Engine.Result applyTranslogOperation(Translog.Operation operation, Engine
// we set canHaveDuplicates to true all the time such that we de-optimze the translog case and ensure that all
// autoGeneratedID docs that are coming from the primary are updated correctly.
result = applyIndexOperation(index.seqNo(), index.primaryTerm(), index.version(),
index.versionType().versionTypeForReplicationAndRecovery(), index.getAutoGeneratedIdTimestamp(), true, origin,
VersionType.EXTERNAL, index.getAutoGeneratedIdTimestamp(), true, origin,
source(shardId.getIndexName(), index.type(), index.id(), index.source(),
XContentHelper.xContentType(index.source())).routing(index.routing()));
break;
case DELETE:
final Translog.Delete delete = (Translog.Delete) operation;
result = applyDeleteOperation(delete.seqNo(), delete.primaryTerm(), delete.version(), delete.type(), delete.id(),
delete.versionType().versionTypeForReplicationAndRecovery(), origin);
VersionType.EXTERNAL, origin);
break;
case NO_OP:
final Translog.NoOp noOp = (Translog.NoOp) operation;
Expand Down
49 changes: 16 additions & 33 deletions server/src/main/java/org/elasticsearch/index/translog/Translog.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.elasticsearch.common.util.concurrent.ReleasableLock;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
Expand Down Expand Up @@ -1011,15 +1010,15 @@ public static class Index implements Operation {

public static final int FORMAT_6_0 = 8; // since 6.0.0
public static final int FORMAT_NO_PARENT = FORMAT_6_0 + 1; // since 7.0
public static final int SERIALIZATION_FORMAT = FORMAT_NO_PARENT;
public static final int FORMAT_NO_VERSION_TYPE = FORMAT_NO_PARENT + 1; // since 7.0
public static final int SERIALIZATION_FORMAT = FORMAT_NO_VERSION_TYPE;

private final String id;
private final long autoGeneratedIdTimestamp;
private final String type;
private final long seqNo;
private final long primaryTerm;
private final long version;
private final VersionType versionType;
private final BytesReference source;
private final String routing;

Expand All @@ -1034,8 +1033,9 @@ private Index(final StreamInput in) throws IOException {
in.readOptionalString(); // _parent
}
this.version = in.readLong();
this.versionType = VersionType.fromValue(in.readByte());
assert versionType.validateVersionForWrites(this.version) : "invalid version for writes: " + this.version;
if (format < FORMAT_NO_VERSION_TYPE) {
in.readByte(); // _version_type
}
this.autoGeneratedIdTimestamp = in.readLong();
seqNo = in.readLong();
primaryTerm = in.readLong();
Expand All @@ -1049,23 +1049,21 @@ public Index(Engine.Index index, Engine.IndexResult indexResult) {
this.seqNo = indexResult.getSeqNo();
this.primaryTerm = index.primaryTerm();
this.version = indexResult.getVersion();
this.versionType = index.versionType();
this.autoGeneratedIdTimestamp = index.getAutoGeneratedIdTimestamp();
}

public Index(String type, String id, long seqNo, long primaryTerm, byte[] source) {
this(type, id, seqNo, primaryTerm, Versions.MATCH_ANY, VersionType.INTERNAL, source, null, -1);
this(type, id, seqNo, primaryTerm, Versions.MATCH_ANY, source, null, -1);
}

public Index(String type, String id, long seqNo, long primaryTerm, long version, VersionType versionType,
public Index(String type, String id, long seqNo, long primaryTerm, long version,
byte[] source, String routing, long autoGeneratedIdTimestamp) {
this.type = type;
this.id = id;
this.source = new BytesArray(source);
this.seqNo = seqNo;
this.primaryTerm = primaryTerm;
this.version = version;
this.versionType = versionType;
this.routing = routing;
this.autoGeneratedIdTimestamp = autoGeneratedIdTimestamp;
}
Expand Down Expand Up @@ -1110,10 +1108,6 @@ public long version() {
return this.version;
}

public VersionType versionType() {
return versionType;
}

@Override
public Source getSource() {
return new Source(source, routing);
Expand All @@ -1126,8 +1120,6 @@ private void write(final StreamOutput out) throws IOException {
out.writeBytesReference(source);
out.writeOptionalString(routing);
out.writeLong(version);

out.writeByte(versionType.getValue());
out.writeLong(autoGeneratedIdTimestamp);
out.writeLong(seqNo);
out.writeLong(primaryTerm);
Expand All @@ -1149,7 +1141,6 @@ public boolean equals(Object o) {
primaryTerm != index.primaryTerm ||
id.equals(index.id) == false ||
type.equals(index.type) == false ||
versionType != index.versionType ||
autoGeneratedIdTimestamp != index.autoGeneratedIdTimestamp ||
source.equals(index.source) == false) {
return false;
Expand All @@ -1168,7 +1159,6 @@ public int hashCode() {
result = 31 * result + Long.hashCode(seqNo);
result = 31 * result + Long.hashCode(primaryTerm);
result = 31 * result + Long.hashCode(version);
result = 31 * result + versionType.hashCode();
result = 31 * result + source.hashCode();
result = 31 * result + (routing != null ? routing.hashCode() : 0);
result = 31 * result + Long.hashCode(autoGeneratedIdTimestamp);
Expand All @@ -1194,14 +1184,14 @@ public long getAutoGeneratedIdTimestamp() {
public static class Delete implements Operation {

private static final int FORMAT_6_0 = 4; // 6.0 - *
public static final int SERIALIZATION_FORMAT = FORMAT_6_0;
public static final int FORMAT_NO_VERSION_TYPE = FORMAT_6_0 + 1; // since 7.0
public static final int SERIALIZATION_FORMAT = FORMAT_NO_VERSION_TYPE;

private final String type, id;
private final Term uid;
private final long seqNo;
private final long primaryTerm;
private final long version;
private final VersionType versionType;

private Delete(final StreamInput in) throws IOException {
final int format = in.readVInt();// SERIALIZATION_FORMAT
Expand All @@ -1210,29 +1200,29 @@ private Delete(final StreamInput in) throws IOException {
id = in.readString();
uid = new Term(in.readString(), in.readBytesRef());
this.version = in.readLong();
this.versionType = VersionType.fromValue(in.readByte());
assert versionType.validateVersionForWrites(this.version);
if (format < FORMAT_NO_VERSION_TYPE) {
in.readByte(); // versionType
}
seqNo = in.readLong();
primaryTerm = in.readLong();
}

public Delete(Engine.Delete delete, Engine.DeleteResult deleteResult) {
this(delete.type(), delete.id(), delete.uid(), deleteResult.getSeqNo(), delete.primaryTerm(), deleteResult.getVersion(), delete.versionType());
this(delete.type(), delete.id(), delete.uid(), deleteResult.getSeqNo(), delete.primaryTerm(), deleteResult.getVersion());
}

/** utility for testing */
public Delete(String type, String id, long seqNo, long primaryTerm, Term uid) {
this(type, id, uid, seqNo, primaryTerm, Versions.MATCH_ANY, VersionType.INTERNAL);
this(type, id, uid, seqNo, primaryTerm, Versions.MATCH_ANY);
}

public Delete(String type, String id, Term uid, long seqNo, long primaryTerm, long version, VersionType versionType) {
public Delete(String type, String id, Term uid, long seqNo, long primaryTerm, long version) {
this.type = Objects.requireNonNull(type);
this.id = Objects.requireNonNull(id);
this.uid = uid;
this.seqNo = seqNo;
this.primaryTerm = primaryTerm;
this.version = version;
this.versionType = versionType;
}

@Override
Expand Down Expand Up @@ -1271,10 +1261,6 @@ public long version() {
return this.version;
}

public VersionType versionType() {
return this.versionType;
}

@Override
public Source getSource() {
throw new IllegalStateException("trying to read doc source from delete operation");
Expand All @@ -1287,7 +1273,6 @@ private void write(final StreamOutput out) throws IOException {
out.writeString(uid.field());
out.writeBytesRef(uid.bytes());
out.writeLong(version);
out.writeByte(versionType.getValue());
out.writeLong(seqNo);
out.writeLong(primaryTerm);
}
Expand All @@ -1306,8 +1291,7 @@ public boolean equals(Object o) {
return version == delete.version &&
seqNo == delete.seqNo &&
primaryTerm == delete.primaryTerm &&
uid.equals(delete.uid) &&
versionType == delete.versionType;
uid.equals(delete.uid);
}

@Override
Expand All @@ -1316,7 +1300,6 @@ public int hashCode() {
result = 31 * result + Long.hashCode(seqNo);
result = 31 * result + Long.hashCode(primaryTerm);
result = 31 * result + Long.hashCode(version);
result = 31 * result + versionType.hashCode();
return result;
}

Expand Down
Loading