Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove TranslogRecoveryPerformer #24858

Conversation

ywelsch
Copy link
Contributor

@ywelsch ywelsch commented May 24, 2017

Splits TranslogRecoveryPerformer into three parts:

  • the translog operation to engine operation converter
  • the operation perfomer (that indexes the operation into the engine)
  • the translog statistics (for which there is already RecoveryState.Translog)

This makes it possible for peer recovery to use the same IndexShard interface as bulk shard requests (i.e. Engine operations instead of Translog operations). It also pushes the "fail on bad mapping" logic outside of IndexShard. Future pull requests could unify the BulkShard and peer recovery path even more.

@ywelsch ywelsch requested a review from bleskes May 24, 2017 12:26
@ywelsch ywelsch force-pushed the enhance/move-recoveryperformer-out-of-engine-config branch from 945a7d7 to b0ad489 Compare May 24, 2017 15:31
@ywelsch ywelsch changed the title Move TranslogRecoveryPerformer out of EngineConfig Remove TranslogRecoveryPerformer May 24, 2017
@ywelsch ywelsch force-pushed the enhance/move-recoveryperformer-out-of-engine-config branch from b0ad489 to ca5b9f9 Compare May 24, 2017 15:37
Copy link
Contributor

@bleskes bleskes left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm +0 on this (with one exception, see comment) and OK with merging it. @s1monw, since you have designed the original scheme, I think it's good if you look as well.

/**
* Returns statistics object for the translog. Used during translog recovery, see also {@link Engine#recoverFromTranslog()}
*/
public RecoveryState.Translog getTranslogStats() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: call this (and derivatives) getRecoveryTranslogStats?

for (Translog.Operation op : operations) {
Engine.Operation engineOp = indexShard().convertToEngineOp(op, Engine.Operation.Origin.PEER_RECOVERY);
if (engineOp instanceof Engine.Index && ((Engine.Index) engineOp).parsedDoc().dynamicMappingsUpdate() != null) {
translog.decrementRecoveredOperations(completedOps); // clean-up stats
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a tricky place to put this - it doesn't really know what the retry semantics are (that we always retry a full batch). This is why we had the BatchOperationException. If we want to remove it from the "official exception list" (+1 on that), we can still make BatchOperationException dedicated non ElasticsearchException by always rethrowing it's cause.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we also have mapping updates for deletes? I wonder if that is the case now that we allow type introduction for deletes too?!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the mapping is tricky too, I am not sure if we can hit it because of a broken mapping or anything in which case we should fail the recovery? Maybe we can use DelayRecoveryException for this purpose instead? it's really nothing else but a delay?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've changed the flow of this method to first do the conversion for all the operations in the batch and then only proceed with the actual indexing once we've confirmed that there were no mapping updates. This makes the BatchOperationException obsolete.

@s1monw yes, the method TransportShardBulkAction.executeDeleteRequestOnReplica currently has other mapping conditions than our recovery code here. The main motivation for this PR was to change some of the internal APIs to address the divergence between recovery and replication code. We can fix the actual divergences in a follow-up.

@bleskes
Copy link
Contributor

bleskes commented May 26, 2017

the latest commit pushed me over to +1...

@ywelsch ywelsch requested a review from s1monw May 30, 2017 07:50
Copy link
Contributor

@s1monw s1monw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

first I got exited, I thought @ywelsch found a way to encapsulate the translog recovery entirely inside the engine... Well I am still excited since it's moving things into the right places IMO. I also like to get rid of another exception. I left some suggestions, thanks @ywelsch for cleaning things up

for (Translog.Operation op : operations) {
Engine.Operation engineOp = indexShard().convertToEngineOp(op, Engine.Operation.Origin.PEER_RECOVERY);
if (engineOp instanceof Engine.Index && ((Engine.Index) engineOp).parsedDoc().dynamicMappingsUpdate() != null) {
translog.decrementRecoveredOperations(completedOps); // clean-up stats
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we also have mapping updates for deletes? I wonder if that is the case now that we allow type introduction for deletes too?!

for (Translog.Operation op : operations) {
Engine.Operation engineOp = indexShard().convertToEngineOp(op, Engine.Operation.Origin.PEER_RECOVERY);
if (engineOp instanceof Engine.Index && ((Engine.Index) engineOp).parsedDoc().dynamicMappingsUpdate() != null) {
translog.decrementRecoveredOperations(completedOps); // clean-up stats
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the mapping is tricky too, I am not sure if we can hit it because of a broken mapping or anything in which case we should fail the recovery? Maybe we can use DelayRecoveryException for this purpose instead? it's really nothing else but a delay?

@@ -375,12 +378,26 @@ public void ensureClusterStateVersion(long clusterStateVersion) {
}

@Override
public long indexTranslogOperations(
List<Translog.Operation> operations, int totalTranslogOps) throws TranslogRecoveryPerformer.BatchOperationException {
public long indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps) throws MapperException, IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any chance we can get a unittest for this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is already extensive test coverage for this (e.g. test subclasses of ESIndexLevelReplicationTestCase).

exception);
final RecoveryState.Translog translog = recoveryTarget.state().getTranslog();
translog.decrementRecoveredOperations(exception.completedOperations()); // do the maintainance and rollback competed ops
logger.trace("delaying recovery due to missing mapping changes", exception);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am leaning towards making this a debug or remove it entirely. I really think it's worth a debug statement

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I've changed it to "debug" level in 6535937

return mapperService.documentMapperWithAutoCreate(type); // protected for testing
}

public Engine.Operation convertToEngineOp(Translog.Operation operation, Engine.Operation.Origin origin) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this class seems simple enough to be unittested? Maybe we can add a test based on IndexShardTestCase

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added in 0d2e800

return translogOpToEngineOpConverter.convertToEngineOp(operation, origin);
}

private int runTranslogRecovery(Engine engine, Translog.Snapshot snapshot) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any chance we can add a test for this to IndexShardTests

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added in 658b889

@@ -552,6 +554,34 @@ private IndexShardState changeState(IndexShardState newState, String reason) {
return new Engine.Index(uid, doc, seqNo, primaryTerm, version, versionType, origin, startTime, autoGeneratedIdTimestamp, isRetry);
}

public Engine.Result applyOperation(Engine.Operation operation) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add a javadoc to this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added in 61f17e7

@ywelsch ywelsch requested review from s1monw and bleskes June 6, 2017 15:48
Copy link
Contributor

@s1monw s1monw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Contributor

@bleskes bleskes left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thx @ywelsch

exception);
final RecoveryState.Translog translog = recoveryTarget.state().getTranslog();
translog.decrementRecoveredOperations(exception.completedOperations()); // do the maintainance and rollback competed ops
logger.debug("delaying recovery due to missing mapping changes", exception);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unrelated, but can this still happen today? do we want to assert here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this can still happen (although very rarely). In theory, we could avoid this by doing something similar as calling recoveryTarget.ensureClusterStateVersion(currentClusterStateVersion) before phase2. The tricky bit is that the current primary might have indexed something based on a mapping change that has not been fully applied yet (i.e. is being applied, but not yet available under ClusterService.state(). In this case we would rather want to know about the pre-applied state).

throw new IndexShardNotRecoveringException(shardId, indexShard().state());
}
// first convert all translog operations to engine operations to check for mapping updates
List<Engine.Operation> engineOps = operations.stream().map(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@ywelsch ywelsch merged commit 26ec891 into elastic:master Jun 7, 2017
@ywelsch
Copy link
Contributor Author

ywelsch commented Jun 7, 2017

thanks @bleskes @s1monw. Special thanks for the merge conflicts @bleskes (2x).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants