-
Notifications
You must be signed in to change notification settings - Fork 24.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Remove TranslogRecoveryPerformer #24858
Remove TranslogRecoveryPerformer #24858
Conversation
945a7d7
to
b0ad489
Compare
b0ad489
to
ca5b9f9
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm +0 on this (with one exception, see comment) and OK with merging it. @s1monw, since you have designed the original scheme, I think it's good if you look as well.
/** | ||
* Returns statistics object for the translog. Used during translog recovery, see also {@link Engine#recoverFromTranslog()} | ||
*/ | ||
public RecoveryState.Translog getTranslogStats() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: call this (and derivatives) getRecoveryTranslogStats?
for (Translog.Operation op : operations) { | ||
Engine.Operation engineOp = indexShard().convertToEngineOp(op, Engine.Operation.Origin.PEER_RECOVERY); | ||
if (engineOp instanceof Engine.Index && ((Engine.Index) engineOp).parsedDoc().dynamicMappingsUpdate() != null) { | ||
translog.decrementRecoveredOperations(completedOps); // clean-up stats |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is a tricky place to put this - it doesn't really know what the retry semantics are (that we always retry a full batch). This is why we had the BatchOperationException. If we want to remove it from the "official exception list" (+1 on that), we can still make BatchOperationException dedicated non ElasticsearchException by always rethrowing it's cause.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we also have mapping updates for deletes? I wonder if that is the case now that we allow type introduction for deletes too?!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the mapping is tricky too, I am not sure if we can hit it because of a broken mapping or anything in which case we should fail the recovery? Maybe we can use DelayRecoveryException
for this purpose instead? it's really nothing else but a delay?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've changed the flow of this method to first do the conversion for all the operations in the batch and then only proceed with the actual indexing once we've confirmed that there were no mapping updates. This makes the BatchOperationException obsolete.
@s1monw yes, the method TransportShardBulkAction.executeDeleteRequestOnReplica
currently has other mapping conditions than our recovery code here. The main motivation for this PR was to change some of the internal APIs to address the divergence between recovery and replication code. We can fix the actual divergences in a follow-up.
…he recovery logic from a snapshot
the latest commit pushed me over to +1... |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
first I got exited, I thought @ywelsch found a way to encapsulate the translog recovery entirely inside the engine... Well I am still excited since it's moving things into the right places IMO. I also like to get rid of another exception. I left some suggestions, thanks @ywelsch for cleaning things up
for (Translog.Operation op : operations) { | ||
Engine.Operation engineOp = indexShard().convertToEngineOp(op, Engine.Operation.Origin.PEER_RECOVERY); | ||
if (engineOp instanceof Engine.Index && ((Engine.Index) engineOp).parsedDoc().dynamicMappingsUpdate() != null) { | ||
translog.decrementRecoveredOperations(completedOps); // clean-up stats |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we also have mapping updates for deletes? I wonder if that is the case now that we allow type introduction for deletes too?!
for (Translog.Operation op : operations) { | ||
Engine.Operation engineOp = indexShard().convertToEngineOp(op, Engine.Operation.Origin.PEER_RECOVERY); | ||
if (engineOp instanceof Engine.Index && ((Engine.Index) engineOp).parsedDoc().dynamicMappingsUpdate() != null) { | ||
translog.decrementRecoveredOperations(completedOps); // clean-up stats |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the mapping is tricky too, I am not sure if we can hit it because of a broken mapping or anything in which case we should fail the recovery? Maybe we can use DelayRecoveryException
for this purpose instead? it's really nothing else but a delay?
@@ -375,12 +378,26 @@ public void ensureClusterStateVersion(long clusterStateVersion) { | |||
} | |||
|
|||
@Override | |||
public long indexTranslogOperations( | |||
List<Translog.Operation> operations, int totalTranslogOps) throws TranslogRecoveryPerformer.BatchOperationException { | |||
public long indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps) throws MapperException, IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any chance we can get a unittest for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is already extensive test coverage for this (e.g. test subclasses of ESIndexLevelReplicationTestCase).
exception); | ||
final RecoveryState.Translog translog = recoveryTarget.state().getTranslog(); | ||
translog.decrementRecoveredOperations(exception.completedOperations()); // do the maintainance and rollback competed ops | ||
logger.trace("delaying recovery due to missing mapping changes", exception); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am leaning towards making this a debug or remove it entirely. I really think it's worth a debug statement
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, I've changed it to "debug" level in 6535937
return mapperService.documentMapperWithAutoCreate(type); // protected for testing | ||
} | ||
|
||
public Engine.Operation convertToEngineOp(Translog.Operation operation, Engine.Operation.Origin origin) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this class seems simple enough to be unittested? Maybe we can add a test based on IndexShardTestCase
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added in 0d2e800
return translogOpToEngineOpConverter.convertToEngineOp(operation, origin); | ||
} | ||
|
||
private int runTranslogRecovery(Engine engine, Translog.Snapshot snapshot) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any chance we can add a test for this to IndexShardTests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added in 658b889
@@ -552,6 +554,34 @@ private IndexShardState changeState(IndexShardState newState, String reason) { | |||
return new Engine.Index(uid, doc, seqNo, primaryTerm, version, versionType, origin, startTime, autoGeneratedIdTimestamp, isRetry); | |||
} | |||
|
|||
public Engine.Result applyOperation(Engine.Operation operation) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please add a javadoc to this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added in 61f17e7
…eryperformer-out-of-engine-config
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thx @ywelsch
exception); | ||
final RecoveryState.Translog translog = recoveryTarget.state().getTranslog(); | ||
translog.decrementRecoveredOperations(exception.completedOperations()); // do the maintainance and rollback competed ops | ||
logger.debug("delaying recovery due to missing mapping changes", exception); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unrelated, but can this still happen today? do we want to assert here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this can still happen (although very rarely). In theory, we could avoid this by doing something similar as calling recoveryTarget.ensureClusterStateVersion(currentClusterStateVersion)
before phase2. The tricky bit is that the current primary might have indexed something based on a mapping change that has not been fully applied yet (i.e. is being applied, but not yet available under ClusterService.state(). In this case we would rather want to know about the pre-applied state).
throw new IndexShardNotRecoveringException(shardId, indexShard().state()); | ||
} | ||
// first convert all translog operations to engine operations to check for mapping updates | ||
List<Engine.Operation> engineOps = operations.stream().map( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
Splits TranslogRecoveryPerformer into three parts:
This makes it possible for peer recovery to use the same IndexShard interface as bulk shard requests (i.e. Engine operations instead of Translog operations). It also pushes the "fail on bad mapping" logic outside of IndexShard. Future pull requests could unify the BulkShard and peer recovery path even more.