Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cancelling a peer recovery on the source can leak a primary permit #30318

Merged
merged 2 commits into from
May 2, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,9 @@
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.RateLimiter;
import org.apache.lucene.util.ArrayUtil;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
Expand All @@ -44,6 +43,8 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.CancellableThreads;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.core.internal.io.Streams;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.RecoveryEngineException;
Expand All @@ -67,6 +68,7 @@
import java.util.Comparator;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.Supplier;
Expand Down Expand Up @@ -142,7 +144,7 @@ public RecoveryResponse recoverToTarget() throws IOException {
throw new DelayRecoveryException("source node does not have the shard listed in its state as allocated on the node");
}
assert targetShardRouting.initializing() : "expected recovery target to be initializing but was " + targetShardRouting;
}, shardId + " validating recovery target ["+ request.targetAllocationId() + "] registered ");
}, shardId + " validating recovery target ["+ request.targetAllocationId() + "] registered ", shard, cancellableThreads);

try (Closeable ignored = shard.acquireTranslogRetentionLock()) {
final long startingSeqNo;
Expand Down Expand Up @@ -196,7 +198,7 @@ public RecoveryResponse recoverToTarget() throws IOException {
* all documents up to maxSeqNo in phase2.
*/
runUnderPrimaryPermit(() -> shard.initiateTracking(request.targetAllocationId()),
shardId + " initiating tracking of " + request.targetAllocationId());
shardId + " initiating tracking of " + request.targetAllocationId(), shard, cancellableThreads);

final long endingSeqNo = shard.seqNoStats().getMaxSeqNo();
/*
Expand Down Expand Up @@ -227,17 +229,38 @@ private boolean isTargetSameHistory() {
return targetHistoryUUID != null && targetHistoryUUID.equals(shard.getHistoryUUID());
}

private void runUnderPrimaryPermit(CancellableThreads.Interruptable runnable, String reason) {
static void runUnderPrimaryPermit(CancellableThreads.Interruptable runnable, String reason,
IndexShard primary, CancellableThreads cancellableThreads) {
cancellableThreads.execute(() -> {
final PlainActionFuture<Releasable> onAcquired = new PlainActionFuture<>();
shard.acquirePrimaryOperationPermit(onAcquired, ThreadPool.Names.SAME, reason);
try (Releasable ignored = onAcquired.actionGet()) {
CompletableFuture<Releasable> permit = new CompletableFuture<>();
final ActionListener<Releasable> onAcquired = new ActionListener<Releasable>() {
@Override
public void onResponse(Releasable releasable) {
if (permit.complete(releasable) == false) {
releasable.close();
}
}

@Override
public void onFailure(Exception e) {
permit.completeExceptionally(e);
}
};
primary.acquirePrimaryOperationPermit(onAcquired, ThreadPool.Names.SAME, reason);
try (Releasable ignored = FutureUtils.get(permit)) {
// check that the IndexShard still has the primary authority. This needs to be checked under operation permit to prevent
// races, as IndexShard will switch its authority only when it holds all operation permits, see IndexShard.relocated()
if (shard.isPrimaryMode() == false) {
throw new IndexShardRelocatedException(shard.shardId());
if (primary.isPrimaryMode() == false) {
throw new IndexShardRelocatedException(primary.shardId());
}
runnable.run();
} finally {
// just in case we got an exception (likely interrupted) while waiting for the get
permit.whenComplete((r, e) -> {
if (r != null) {
r.close();
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we do anything with the exception, like log it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good question. I think in all likely hood the exception will already bubble up through the get. I can log it in trace (and wire in a logger).

});
}
});
}
Expand Down Expand Up @@ -489,11 +512,11 @@ public void finalizeRecovery(final long targetLocalCheckpoint) throws IOExceptio
* the permit then the state of the shard will be relocated and this recovery will fail.
*/
runUnderPrimaryPermit(() -> shard.markAllocationIdAsInSync(request.targetAllocationId(), targetLocalCheckpoint),
shardId + " marking " + request.targetAllocationId() + " as in sync");
shardId + " marking " + request.targetAllocationId() + " as in sync", shard, cancellableThreads);
final long globalCheckpoint = shard.getGlobalCheckpoint();
cancellableThreads.executeIO(() -> recoveryTarget.finalizeRecovery(globalCheckpoint));
runUnderPrimaryPermit(() -> shard.updateGlobalCheckpointForShard(request.targetAllocationId(), globalCheckpoint),
shardId + " updating " + request.targetAllocationId() + "'s global checkpoint");
shardId + " updating " + request.targetAllocationId() + "'s global checkpoint", shard, cancellableThreads);

if (request.isPrimaryRelocation()) {
logger.trace("performing relocation hand-off");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.CancellableThreads;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.IndexSettings;
Expand Down Expand Up @@ -439,6 +440,30 @@ long phase2(long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo,
assertFalse(phase2Called.get());
}

public void testCancellationsDoesNotLeakPrimaryPermits() throws Exception {
final CancellableThreads cancellableThreads = new CancellableThreads();
final IndexShard shard = mock(IndexShard.class);
final AtomicBoolean freed = new AtomicBoolean(true);
when(shard.isPrimaryMode()).thenReturn(true);
doAnswer(invocation -> {
freed.set(false);
((ActionListener<Releasable>)invocation.getArguments()[0]).onResponse(() -> freed.set(true));
return null;
}).when(shard).acquirePrimaryOperationPermit(any(), anyString(), anyObject());

Thread cancelingThread = new Thread(() -> cancellableThreads.cancel("test"));
cancelingThread.start();
try {
RecoverySourceHandler.runUnderPrimaryPermit(() -> {}, "test", shard, cancellableThreads);
} catch (CancellableThreads.ExecutionCancelledException e) {
// expected.
}
cancelingThread.join();
// we have to use assert busy as we may be interrupted while acquiring the permit, if so we want to check
// that the permit is released.
assertBusy(() -> assertTrue(freed.get()));
}

private Store newStore(Path path) throws IOException {
return newStore(path, true);
}
Expand Down