Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix race between replica reset and primary promotion #32442

Merged
merged 19 commits into from
Aug 3, 2018
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 41 additions & 37 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,8 @@ public void updateShardState(final ShardRouting newRouting,
TimeUnit.MINUTES,
() -> {
shardStateUpdated.await();
assert primaryTerm == newPrimaryTerm :
"shard term changed on primary. expected [" + newPrimaryTerm + "] but was [" + primaryTerm + "]";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💯 . can you please add the shard routing so we're sure to know where it came from?

try {
/*
* If this shard was serving as a replica shard when another shard was promoted to primary then the state of
Expand Down Expand Up @@ -2216,10 +2218,11 @@ public void acquireReplicaOperationPermit(final long operationPrimaryTerm, final
final Object debugInfo) {
verifyNotClosed();
verifyReplicationTarget();
final boolean globalCheckpointUpdated;
if (operationPrimaryTerm > primaryTerm) {
synchronized (primaryTermMutex) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we can remove this and only lock mutex on this level (it's always good to avoid multiple locks if possible).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine relaxing it and using mutex as we now use asyncBlockOperations. This means that in practice, we will have at most have the number of indexing threads block on this (while possibly a concurrent cluster state update comes in, trying to acquire mutex as well). The first indexing thread will increase pendingPrimaryTerm, and all the other ones that are blocked on mutex will just acquire mutex and do a quick noop. All subsequent writes will not acquire the mutex anymore as they will bypass the pre-flight check.

if (operationPrimaryTerm > primaryTerm) {
verifyNotClosed();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering - why did you have to add this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not have to (i.e. no failing test). I just saw that we were not rechecking this condition after possibly waiting for a while on primaryTermMutex. The next check 2 lines below will also fail this with an IndexShardNotStartedException. I found it nicer though to throw the IndexShardClosedException if possible.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

after second thought, this is less of an issue after I converted blockOperations to asyncBlockOperations in acquireReplicaOperationPermit. I'm going to revert


IndexShardState shardState = state();
// only roll translog and update primary term if shard has made it past recovery
// Having a new primary term here means that the old primary failed and that there is a new primary, which again
Expand All @@ -2229,38 +2232,41 @@ public void acquireReplicaOperationPermit(final long operationPrimaryTerm, final
shardState != IndexShardState.STARTED) {
throw new IndexShardNotStartedException(shardId, shardState);
}
try {
indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> {
assert operationPrimaryTerm > primaryTerm :
"shard term already update. op term [" + operationPrimaryTerm + "], shardTerm [" + primaryTerm + "]";

synchronized (mutex) {
final CountDownLatch termUpdated = new CountDownLatch(1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should have a method called "setPrimaryTerm" which gets a primary + a runnable to run underly the async block. That method will be called both from here and from updateShardState and make sure that the semantics of the exposing the primary term (after submitting async block and asserting we're under a mutex via assertions) are the same.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've given this a try in 70262d7

if (operationPrimaryTerm > primaryTerm) {
indexShardOperationPermits.asyncBlockOperations(30, TimeUnit.MINUTES, () -> {
termUpdated.await();
// a primary promotion, or another primary term transition, might have been triggered concurrently to this
// recheck under the operation permit if we can skip doing this work
if (operationPrimaryTerm == primaryTerm) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can assert the operationPrimary term is always <= than the primary term here.

updateGlobalCheckpointOnReplica(globalCheckpoint, "primary term transition");
final long currentGlobalCheckpoint = getGlobalCheckpoint();
final long localCheckpoint;
if (currentGlobalCheckpoint == SequenceNumbers.UNASSIGNED_SEQ_NO) {
localCheckpoint = SequenceNumbers.NO_OPS_PERFORMED;
} else {
localCheckpoint = currentGlobalCheckpoint;
}
logger.trace(
"detected new primary with primary term [{}], resetting local checkpoint from [{}] to [{}]",
operationPrimaryTerm,
getLocalCheckpoint(),
localCheckpoint);
getEngine().resetLocalCheckpoint(localCheckpoint);
getEngine().rollTranslogGeneration();
} else {
logger.trace("a primary promotion or concurrent primary term transition has made this reset obsolete");
}
}, e -> failShard("exception during primary term transition", e));

primaryTerm = operationPrimaryTerm;
updateGlobalCheckpointOnReplica(globalCheckpoint, "primary term transition");
final long currentGlobalCheckpoint = getGlobalCheckpoint();
final long localCheckpoint;
if (currentGlobalCheckpoint == SequenceNumbers.UNASSIGNED_SEQ_NO) {
localCheckpoint = SequenceNumbers.NO_OPS_PERFORMED;
} else {
localCheckpoint = currentGlobalCheckpoint;
}
logger.trace(
"detected new primary with primary term [{}], resetting local checkpoint from [{}] to [{}]",
operationPrimaryTerm,
getLocalCheckpoint(),
localCheckpoint);
getEngine().resetLocalCheckpoint(localCheckpoint);
getEngine().rollTranslogGeneration();
});
globalCheckpointUpdated = true;
} catch (final Exception e) {
onPermitAcquired.onFailure(e);
return;
termUpdated.countDown();
}
}
} else {
globalCheckpointUpdated = false;
}
}
} else {
globalCheckpointUpdated = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to removing this.

}

assert operationPrimaryTerm <= primaryTerm
Expand All @@ -2279,14 +2285,12 @@ public void onResponse(final Releasable releasable) {
primaryTerm);
onPermitAcquired.onFailure(new IllegalStateException(message));
} else {
if (globalCheckpointUpdated == false) {
try {
updateGlobalCheckpointOnReplica(globalCheckpoint, "operation");
} catch (Exception e) {
releasable.close();
onPermitAcquired.onFailure(e);
return;
}
try {
updateGlobalCheckpointOnReplica(globalCheckpoint, "operation");
} catch (Exception e) {
releasable.close();
onPermitAcquired.onFailure(e);
return;
}
onPermitAcquired.onResponse(releasable);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

import java.io.Closeable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -59,7 +60,7 @@ final class IndexShardOperationPermits implements Closeable {
final Semaphore semaphore = new Semaphore(TOTAL_PERMITS, true); // fair to ensure a blocking thread is not starved
private final List<DelayedOperation> delayedOperations = new ArrayList<>(); // operations that are delayed
private volatile boolean closed;
private boolean delayed; // does not need to be volatile as all accesses are done under a lock on this
private int queuedBlockOperations; // does not need to be volatile as all accesses are done under a lock on this

// only valid when assertions are enabled. Key is AtomicBoolean associated with each permit to ensure close once semantics.
// Value is a tuple, with a some debug information supplied by the caller and a stack trace of the acquiring thread
Expand Down Expand Up @@ -102,9 +103,6 @@ <E extends Exception> void blockOperations(
final long timeout,
final TimeUnit timeUnit,
final CheckedRunnable<E> onBlocked) throws InterruptedException, TimeoutException, E {
if (closed) {
throw new IndexShardClosedException(shardId);
}
delayOperations();
try {
doBlockOperations(timeout, timeUnit, onBlocked);
Expand Down Expand Up @@ -147,13 +145,12 @@ public void onAfter() {
}

private void delayOperations() {
if (closed) {
throw new IndexShardClosedException(shardId);
}
synchronized (this) {
if (delayed) {
throw new IllegalStateException("operations are already delayed");
} else {
assert delayedOperations.isEmpty();
delayed = true;
}
assert queuedBlockOperations > 0 || delayedOperations.isEmpty();
queuedBlockOperations++;
}
}

Expand All @@ -164,7 +161,7 @@ private <E extends Exception> void doBlockOperations(
if (Assertions.ENABLED) {
// since delayed is not volatile, we have to synchronize even here for visibility
synchronized (this) {
assert delayed;
assert queuedBlockOperations > 0;
}
}
if (semaphore.tryAcquire(TOTAL_PERMITS, timeout, timeUnit)) {
Expand All @@ -182,10 +179,14 @@ private <E extends Exception> void doBlockOperations(
private void releaseDelayedOperations() {
final List<DelayedOperation> queuedActions;
synchronized (this) {
Copy link
Contributor

@bleskes bleskes Jul 28, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can pull this up to the method. I don't see a reason to drain the queue and the release the queue lock and it will simplify the reasoning a bit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what you mean. What would you change?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated my comment. I mean make this entire method synchronized.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find the reasoning simpler here if we don't extend the mutex to a section of the code which it does not need to cover. Are you ok keeping it as is?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I'm ok. It's subjective.

assert delayed;
queuedActions = new ArrayList<>(delayedOperations);
delayedOperations.clear();
delayed = false;
assert queuedBlockOperations > 0;
queuedBlockOperations--;
if (queuedBlockOperations == 0) {
queuedActions = new ArrayList<>(delayedOperations);
delayedOperations.clear();
} else {
queuedActions = Collections.emptyList();
}
}
if (!queuedActions.isEmpty()) {
/*
Expand Down Expand Up @@ -242,7 +243,7 @@ private void acquire(final ActionListener<Releasable> onAcquired, final String e
final Releasable releasable;
try {
synchronized (this) {
if (delayed) {
if (queuedBlockOperations > 0) {
final Supplier<StoredContext> contextSupplier = threadPool.getThreadContext().newRestorableContext(false);
final ActionListener<Releasable> wrappedListener;
if (executorOnDelay != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -234,6 +235,53 @@ public void testConflictingOpsOnReplica() throws Exception {
}
}

public void testReplicaTermIncrementWithConcurrentPrimaryPromotion() throws Exception {
Map<String, String> mappings =
Collections.singletonMap("type", "{ \"type\": { \"properties\": { \"f\": { \"type\": \"keyword\"} }}}");
try (ReplicationGroup shards = new ReplicationGroup(buildIndexMetaData(2, mappings))) {
shards.startAll();
long primaryPrimaryTerm = shards.getPrimary().getPrimaryTerm();
List<IndexShard> replicas = shards.getReplicas();
IndexShard replica1 = replicas.get(0);
IndexShard replica2 = replicas.get(1);

shards.promoteReplicaToPrimary(replica1, (shard, listener) -> {});
long newReplica1Term = replica1.getPrimaryTerm();
assertEquals(primaryPrimaryTerm + 1, newReplica1Term);

assertEquals(primaryPrimaryTerm, replica2.getPrimaryTerm());

IndexRequest indexRequest = new IndexRequest(index.getName(), "type", "1").source("{ \"f\": \"1\"}", XContentType.JSON);
BulkShardRequest replicationRequest = indexOnPrimary(indexRequest, replica1);

CyclicBarrier barrier = new CyclicBarrier(2);
Thread t1 = new Thread(() -> {
try {
barrier.await();
indexOnReplica(replicationRequest, shards, replica2, newReplica1Term);
} catch (IllegalStateException ise) {
assertThat(ise.getMessage(), containsString("is too old"));
} catch (Exception e) {
throw new RuntimeException(e);
}
});
Thread t2 = new Thread(() -> {
try {
barrier.await();
shards.promoteReplicaToPrimary(replica2).get();
} catch (Exception e) {
throw new RuntimeException(e);
}
});
t2.start();
t1.start();
t1.join();
t2.join();

assertEquals(newReplica1Term + 1, replica2.getPrimaryTerm());
}
}

/**
* test document failures (failures after seq_no generation) are added as noop operation to the translog
* for primary and replica shards
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public class IndexShardOperationPermitsTests extends ESTestCase {
public static void setupThreadPool() {
int writeThreadPoolSize = randomIntBetween(1, 2);
int writeThreadPoolQueueSize = randomIntBetween(1, 2);
threadPool = new TestThreadPool("IndexShardOperationsLockTests",
threadPool = new TestThreadPool("IndexShardOperationPermitsTests",
Settings.builder()
.put("thread_pool." + ThreadPool.Names.WRITE + ".size", writeThreadPoolSize)
.put("thread_pool." + ThreadPool.Names.WRITE + ".queue_size", writeThreadPoolQueueSize)
Expand Down Expand Up @@ -100,7 +100,7 @@ public void checkNoInflightOperations() {
assertThat(permits.getActiveOperationsCount(), equalTo(0));
}

public void testAllOperationsInvoked() throws InterruptedException, TimeoutException, ExecutionException {
public void testAllOperationsInvoked() throws InterruptedException, TimeoutException {
int numThreads = 10;

class DummyException extends RuntimeException {}
Expand Down Expand Up @@ -187,18 +187,20 @@ public void testOperationsInvokedImmediatelyIfNoBlock() throws ExecutionExceptio
future.get().close();
}

public void testOperationsIfClosed() throws ExecutionException, InterruptedException {
public void testOperationsIfClosed() {
PlainActionFuture<Releasable> future = new PlainActionFuture<>();
permits.close();
permits.acquire(future, ThreadPool.Names.GENERIC, true, "");
ExecutionException exception = expectThrows(ExecutionException.class, future::get);
assertThat(exception.getCause(), instanceOf(IndexShardClosedException.class));
}

public void testBlockIfClosed() throws ExecutionException, InterruptedException {
public void testBlockIfClosed() {
permits.close();
expectThrows(IndexShardClosedException.class, () -> permits.blockOperations(randomInt(10), TimeUnit.MINUTES,
() -> { throw new IllegalArgumentException("fake error"); }));
expectThrows(IndexShardClosedException.class, () -> permits.asyncBlockOperations(randomInt(10), TimeUnit.MINUTES,
() -> { throw new IllegalArgumentException("fake error"); }, e -> { throw new AssertionError(e); }));
}

public void testOperationsDelayedIfBlock() throws ExecutionException, InterruptedException, TimeoutException {
Expand All @@ -210,6 +212,36 @@ public void testOperationsDelayedIfBlock() throws ExecutionException, Interrupte
future.get(1, TimeUnit.HOURS).close();
}

public void testGetBlockWhenBlocked() throws ExecutionException, InterruptedException, TimeoutException {
PlainActionFuture<Releasable> future = new PlainActionFuture<>();
final CountDownLatch blockAcquired = new CountDownLatch(1);
final CountDownLatch releaseBlock = new CountDownLatch(1);
final AtomicBoolean blocked = new AtomicBoolean();
try (Releasable ignored = blockAndWait()) {
permits.acquire(future, ThreadPool.Names.GENERIC, true, "");

permits.asyncBlockOperations(
30,
TimeUnit.MINUTES,
() -> {
blocked.set(true);
blockAcquired.countDown();
releaseBlock.await();
},
e -> {
throw new RuntimeException(e);
});
assertFalse(blocked.get());
assertFalse(future.isDone());
}
blockAcquired.await();
assertTrue(blocked.get());
assertFalse(future.isDone());
releaseBlock.countDown();

future.get(1, TimeUnit.HOURS).close();
}

/**
* Tests that the ThreadContext is restored when a operation is executed after it has been delayed due to a block
*/
Expand Down
Loading