Skip to content

Commit

Permalink
review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
ywelsch committed Jul 30, 2018
1 parent 78f8306 commit 70262d7
Showing 1 changed file with 44 additions and 43 deletions.
87 changes: 44 additions & 43 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.CheckedRunnable;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
Expand Down Expand Up @@ -474,13 +475,11 @@ public void updateShardState(final ShardRouting newRouting,
if (resyncStarted == false) {
throw new IllegalStateException("cannot start resync while it's already in progress");
}
indexShardOperationPermits.asyncBlockOperations(
30,
TimeUnit.MINUTES,
bumpPrimaryTerm(newPrimaryTerm,
() -> {
shardStateUpdated.await();
assert pendingPrimaryTerm == newPrimaryTerm :
"shard term changed on primary. expected [" + newPrimaryTerm + "] but was [" + pendingPrimaryTerm + "]";
"shard term changed on primary. expected [" + newPrimaryTerm + "] but was [" + pendingPrimaryTerm + "]" +
", current routing: " + currentRouting + ", new routing: " + newRouting;
assert operationPrimaryTerm < newPrimaryTerm;
operationPrimaryTerm = newPrimaryTerm;
try {
Expand Down Expand Up @@ -527,10 +526,8 @@ public void onFailure(Exception e) {
} catch (final AlreadyClosedException e) {
// okay, the index was deleted
}
},
e -> failShard("exception during primary term transition", e));
});
replicationTracker.activatePrimaryMode(getLocalCheckpoint());
pendingPrimaryTerm = newPrimaryTerm;
}
}
// set this last, once we finished updating all internal state.
Expand Down Expand Up @@ -2215,7 +2212,20 @@ public void acquirePrimaryOperationPermit(ActionListener<Releasable> onPermitAcq
indexShardOperationPermits.acquire(onPermitAcquired, executorOnDelay, false, debugInfo);
}

private final Object primaryTermMutex = new Object();
private <E extends Exception> void bumpPrimaryTerm(long newPrimaryTerm, final CheckedRunnable<E> onBlocked) {
assert Thread.holdsLock(mutex);
assert newPrimaryTerm > pendingPrimaryTerm;
assert operationPrimaryTerm <= pendingPrimaryTerm;
final CountDownLatch termUpdated = new CountDownLatch(1);
indexShardOperationPermits.asyncBlockOperations(30, TimeUnit.MINUTES, () -> {
assert operationPrimaryTerm <= pendingPrimaryTerm;
onBlocked.run();
assert operationPrimaryTerm <= pendingPrimaryTerm;
},
e -> failShard("exception during primary term transition", e));
pendingPrimaryTerm = newPrimaryTerm;
termUpdated.countDown();
}

/**
* Acquire a replica operation permit whenever the shard is ready for indexing (see
Expand All @@ -2238,10 +2248,8 @@ public void acquireReplicaOperationPermit(final long opPrimaryTerm, final long g
verifyNotClosed();
verifyReplicationTarget();
if (opPrimaryTerm > pendingPrimaryTerm) {
synchronized (primaryTermMutex) {
synchronized (mutex) {
if (opPrimaryTerm > pendingPrimaryTerm) {
verifyNotClosed();

IndexShardState shardState = state();
// only roll translog and update primary term if shard has made it past recovery
// Having a new primary term here means that the old primary failed and that there is a new primary, which again
Expand All @@ -2252,39 +2260,32 @@ public void acquireReplicaOperationPermit(final long opPrimaryTerm, final long g
throw new IndexShardNotStartedException(shardId, shardState);
}

synchronized (mutex) {
final CountDownLatch termUpdated = new CountDownLatch(1);
if (opPrimaryTerm > pendingPrimaryTerm) {
indexShardOperationPermits.asyncBlockOperations(30, TimeUnit.MINUTES, () -> {
termUpdated.await();
// a primary promotion, or another primary term transition, might have been triggered concurrently to this
// recheck under the operation permit if we can skip doing this work
if (opPrimaryTerm == pendingPrimaryTerm) {
assert operationPrimaryTerm < pendingPrimaryTerm;
operationPrimaryTerm = pendingPrimaryTerm;
updateGlobalCheckpointOnReplica(globalCheckpoint, "primary term transition");
final long currentGlobalCheckpoint = getGlobalCheckpoint();
final long localCheckpoint;
if (currentGlobalCheckpoint == SequenceNumbers.UNASSIGNED_SEQ_NO) {
localCheckpoint = SequenceNumbers.NO_OPS_PERFORMED;
} else {
localCheckpoint = currentGlobalCheckpoint;
}
logger.trace(
"detected new primary with primary term [{}], resetting local checkpoint from [{}] to [{}]",
opPrimaryTerm,
getLocalCheckpoint(),
localCheckpoint);
getEngine().resetLocalCheckpoint(localCheckpoint);
getEngine().rollTranslogGeneration();
if (opPrimaryTerm > pendingPrimaryTerm) {
bumpPrimaryTerm(opPrimaryTerm, () -> {
// a primary promotion, or another primary term transition, might have been triggered concurrently to this
// recheck under the operation permit if we can skip doing this work
if (opPrimaryTerm == pendingPrimaryTerm) {
assert operationPrimaryTerm < pendingPrimaryTerm;
operationPrimaryTerm = pendingPrimaryTerm;
updateGlobalCheckpointOnReplica(globalCheckpoint, "primary term transition");
final long currentGlobalCheckpoint = getGlobalCheckpoint();
final long localCheckpoint;
if (currentGlobalCheckpoint == SequenceNumbers.UNASSIGNED_SEQ_NO) {
localCheckpoint = SequenceNumbers.NO_OPS_PERFORMED;
} else {
logger.trace("a primary promotion or concurrent primary term transition has made this reset obsolete");
localCheckpoint = currentGlobalCheckpoint;
}
}, e -> failShard("exception during primary term transition", e));

pendingPrimaryTerm = opPrimaryTerm;
termUpdated.countDown();
}
logger.trace(
"detected new primary with primary term [{}], resetting local checkpoint from [{}] to [{}]",
opPrimaryTerm,
getLocalCheckpoint(),
localCheckpoint);
getEngine().resetLocalCheckpoint(localCheckpoint);
getEngine().rollTranslogGeneration();
} else {
logger.trace("a primary promotion or concurrent primary term transition has made this reset obsolete");
}
});
}
}
}
Expand Down

0 comments on commit 70262d7

Please sign in to comment.