Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A replica can be promoted and started in one cluster state update #32042

Merged
merged 6 commits into from
Jul 18, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 14 additions & 5 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -413,10 +413,9 @@ public void updateShardState(final ShardRouting newRouting,

if (state == IndexShardState.POST_RECOVERY && newRouting.active()) {
assert currentRouting.active() == false : "we are in POST_RECOVERY, but our shard routing is active " + currentRouting;

if (newRouting.primary() && currentRouting.isRelocationTarget() == false) {
replicationTracker.activatePrimaryMode(getLocalCheckpoint());
}
assert currentRouting.isRelocationTarget() == false || currentRouting.primary() == false ||
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this assertion confuses me: What should it express?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed it's confusing. I tried to clarify by adding the assertion message. If it helps I can flip the boolean around:

(currentRouting.isRelocationTarget() && currentRouting.primary() && replicationTracker.isPrimaryMode() == false) == false 

but I think that might be just as confusing if not more. Let me know.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's keep as is, with the assertion message I think it's ok. I wonder if we should have an assertion at the end of this method to say something like "if we have an active primary shard that's not relocating, then the replication tracker is in primary mode".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good. will add.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added in 3c79d57

replicationTracker.isPrimaryMode() :
"a primary relocation is completed by the master, but primary mode is not active " + currentRouting;

changeState(IndexShardState.STARTED, "global state is [" + newRouting.state() + "]");
} else if (currentRouting.primary() && currentRouting.relocating() && replicationTracker.isPrimaryMode() == false &&
Expand All @@ -432,7 +431,12 @@ public void updateShardState(final ShardRouting newRouting,
final CountDownLatch shardStateUpdated = new CountDownLatch(1);

if (newRouting.primary()) {
if (newPrimaryTerm != primaryTerm) {
if (newPrimaryTerm == primaryTerm) {
if (currentRouting.initializing() && currentRouting.isRelocationTarget() == false && newRouting.active()) {
// the master started a recovering primary, activate primary mode.
replicationTracker.activatePrimaryMode(getLocalCheckpoint());
}
} else {
assert currentRouting.primary() == false : "term is only increased as part of primary promotion";
/* Note that due to cluster state batching an initializing primary shard term can failed and re-assigned
* in one state causing it's term to be incremented. Note that if both current shard state and new
Expand Down Expand Up @@ -521,6 +525,11 @@ public void onFailure(Exception e) {
}
// set this last, once we finished updating all internal state.
this.shardRouting = newRouting;

assert this.shardRouting.primary() == false ||
this.shardRouting.started() == false || // note that we use started and not active to avoid relocating shards
this.replicationTracker.isPrimaryMode()
: "an started primary must be in primary mode " + this.shardRouting;
shardStateUpdated.countDown();
}
if (currentRouting != null && currentRouting.active() == false && newRouting.active()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ public void testSeqNoCollision() throws Exception {
logger.info("--> Promote replica2 as the primary");
shards.promoteReplicaToPrimary(replica2);
logger.info("--> Recover replica3 from replica2");
recoverReplica(replica3, replica2);
recoverReplica(replica3, replica2, true);
try (Translog.Snapshot snapshot = getTranslog(replica3).newSnapshot()) {
assertThat(snapshot.totalOperations(), equalTo(initDocs + 1));
assertThat(snapshot.next(), equalTo(op2));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ public void testPersistenceStateMetadataPersistence() throws Exception {
}

public void testFailShard() throws Exception {
allowShardFailures();
IndexShard shard = newStartedShard();
final ShardPath shardPath = shard.shardPath();
assertNotNull(shardPath);
Expand Down Expand Up @@ -310,7 +311,8 @@ public void testRejectOperationPermitWithHigherTermWhenNotStarted() throws IOExc
}

public void testPrimaryPromotionDelaysOperations() throws IOException, BrokenBarrierException, InterruptedException {
final IndexShard indexShard = newStartedShard(false);
final IndexShard indexShard = newShard(false);
recoveryEmptyReplica(indexShard, randomBoolean());

final int operations = scaledRandomIntBetween(1, 64);
final CyclicBarrier barrier = new CyclicBarrier(1 + operations);
Expand Down Expand Up @@ -354,20 +356,10 @@ public void onFailure(Exception e) {
barrier.await();
latch.await();

// promote the replica
final ShardRouting replicaRouting = indexShard.routingEntry();
final ShardRouting primaryRouting =
newShardRouting(
replicaRouting.shardId(),
replicaRouting.currentNodeId(),
null,
true,
ShardRoutingState.STARTED,
replicaRouting.allocationId());
indexShard.updateShardState(primaryRouting, indexShard.getPrimaryTerm() + 1, (shard, listener) -> {},
0L, Collections.singleton(primaryRouting.allocationId().getId()),
new IndexShardRoutingTable.Builder(primaryRouting.shardId()).addShard(primaryRouting).build(),
Collections.emptySet());
promoteReplica(indexShard, Collections.singleton(replicaRouting.allocationId().getId()),
new IndexShardRoutingTable.Builder(replicaRouting.shardId()).addShard(replicaRouting).build());


final int delayedOperations = scaledRandomIntBetween(1, 64);
final CyclicBarrier delayedOperationsBarrier = new CyclicBarrier(1 + delayedOperations);
Expand Down Expand Up @@ -429,8 +421,9 @@ public void onFailure(Exception e) {
* 1) Internal state (ala ReplicationTracker) have been updated
* 2) Primary term is set to the new term
*/
public void testPublishingOrderOnPromotion() throws IOException, BrokenBarrierException, InterruptedException {
final IndexShard indexShard = newStartedShard(false);
public void testPublishingOrderOnPromotion() throws IOException, InterruptedException, BrokenBarrierException {
final IndexShard indexShard = newShard(false);
recoveryEmptyReplica(indexShard, randomBoolean());
final long promotedTerm = indexShard.getPrimaryTerm() + 1;
final CyclicBarrier barrier = new CyclicBarrier(2);
final AtomicBoolean stop = new AtomicBoolean();
Expand All @@ -449,18 +442,10 @@ public void testPublishingOrderOnPromotion() throws IOException, BrokenBarrierEx
});
thread.start();

final ShardRouting replicaRouting = indexShard.routingEntry();
final ShardRouting primaryRouting = newShardRouting(replicaRouting.shardId(), replicaRouting.currentNodeId(), null, true,
ShardRoutingState.STARTED, replicaRouting.allocationId());


final Set<String> inSyncAllocationIds = Collections.singleton(primaryRouting.allocationId().getId());
final IndexShardRoutingTable routingTable =
new IndexShardRoutingTable.Builder(primaryRouting.shardId()).addShard(primaryRouting).build();
barrier.await();
// promote the replica
indexShard.updateShardState(primaryRouting, promotedTerm, (shard, listener) -> {}, 0L, inSyncAllocationIds, routingTable,
Collections.emptySet());
final ShardRouting replicaRouting = indexShard.routingEntry();
promoteReplica(indexShard, Collections.singleton(replicaRouting.allocationId().getId()),
new IndexShardRoutingTable.Builder(replicaRouting.shardId()).addShard(replicaRouting).build());

stop.set(true);
thread.join();
Expand All @@ -469,7 +454,8 @@ public void testPublishingOrderOnPromotion() throws IOException, BrokenBarrierEx


public void testPrimaryFillsSeqNoGapsOnPromotion() throws Exception {
final IndexShard indexShard = newStartedShard(false);
final IndexShard indexShard = newShard(false);
recoveryEmptyReplica(indexShard, randomBoolean());

// most of the time this is large enough that most of the time there will be at least one gap
final int operations = 1024 - scaledRandomIntBetween(0, 1024);
Expand All @@ -480,17 +466,8 @@ public void testPrimaryFillsSeqNoGapsOnPromotion() throws Exception {

// promote the replica
final ShardRouting replicaRouting = indexShard.routingEntry();
final ShardRouting primaryRouting =
newShardRouting(
replicaRouting.shardId(),
replicaRouting.currentNodeId(),
null,
true,
ShardRoutingState.STARTED,
replicaRouting.allocationId());
indexShard.updateShardState(primaryRouting, indexShard.getPrimaryTerm() + 1, (shard, listener) -> {},
0L, Collections.singleton(primaryRouting.allocationId().getId()),
new IndexShardRoutingTable.Builder(primaryRouting.shardId()).addShard(primaryRouting).build(), Collections.emptySet());
promoteReplica(indexShard, Collections.singleton(replicaRouting.allocationId().getId()),
new IndexShardRoutingTable.Builder(replicaRouting.shardId()).addShard(replicaRouting).build());

/*
* This operation completing means that the delay operation executed as part of increasing the primary term has completed and the
Expand All @@ -507,7 +484,7 @@ public void onResponse(Releasable releasable) {

@Override
public void onFailure(Exception e) {
throw new RuntimeException(e);
throw new AssertionError(e);
}
},
ThreadPool.Names.GENERIC, "");
Expand Down Expand Up @@ -847,7 +824,7 @@ public void testGlobalCheckpointSync() throws IOException {
// add a replica
recoverShardFromStore(primaryShard);
final IndexShard replicaShard = newShard(shardId, false);
recoverReplica(replicaShard, primaryShard);
recoverReplica(replicaShard, primaryShard, true);
final int maxSeqNo = randomIntBetween(0, 128);
for (int i = 0; i <= maxSeqNo; i++) {
EngineTestCase.generateNewSeqNo(primaryShard.getEngine());
Expand Down Expand Up @@ -1626,7 +1603,7 @@ public void testPrimaryHandOffUpdatesLocalCheckpoint() throws IOException {
IndexShardTestCase.updateRoutingEntry(primarySource, primarySource.routingEntry().relocate(randomAlphaOfLength(10), -1));
final IndexShard primaryTarget = newShard(primarySource.routingEntry().getTargetRelocatingShard());
updateMappings(primaryTarget, primarySource.indexSettings().getIndexMetaData());
recoverReplica(primaryTarget, primarySource);
recoverReplica(primaryTarget, primarySource, true);

// check that local checkpoint of new primary is properly tracked after primary relocation
assertThat(primaryTarget.getLocalCheckpoint(), equalTo(totalOps - 1L));
Expand Down Expand Up @@ -2084,7 +2061,7 @@ public long indexTranslogOperations(List<Translog.Operation> operations, int tot
assertFalse(replica.isSyncNeeded());
return localCheckpoint;
}
}, true);
}, true, true);

closeShards(primary, replica);
}
Expand Down Expand Up @@ -2191,7 +2168,7 @@ public long indexTranslogOperations(List<Translog.Operation> operations, int tot
assertTrue(replica.isActive());
return localCheckpoint;
}
}, false);
}, false, true);

closeShards(primary, replica);
}
Expand Down Expand Up @@ -2243,7 +2220,7 @@ public void finalizeRecovery(long globalCheckpoint) throws IOException {
super.finalizeRecovery(globalCheckpoint);
assertListenerCalled.accept(replica);
}
}, false);
}, false, true);

closeShards(primary, replica);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,14 @@ public void updateShardState(ShardRouting shardRouting,
assertTrue("and active shard must stay active, current: " + this.shardRouting + ", got: " + shardRouting,
shardRouting.active());
}
if (this.shardRouting.primary()) {
assertTrue("a primary shard can't be demoted", shardRouting.primary());
} else if (shardRouting.primary()) {
// note: it's ok for a replica in post recovery to be started and promoted at once
// this can happen when the primary failed after we sent the start shard message
assertTrue("a replica can only be promoted when active. current: " + this.shardRouting + " new: " + shardRouting,
shardRouting.active());
}
this.shardRouting = shardRouting;
if (shardRouting.primary()) {
term = newPrimaryTerm;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public void testGetStartingSeqNo() throws Exception {
try {
// Empty store
{
recoveryEmptyReplica(replica);
recoveryEmptyReplica(replica, true);
final RecoveryTarget recoveryTarget = new RecoveryTarget(replica, null, null, null);
assertThat(PeerRecoveryTargetService.getStartingSeqNo(logger, recoveryTarget), equalTo(0L));
recoveryTarget.decRef();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ public void testPeerRecoverySendSafeCommitInFileBased() throws Exception {
}
IndexShard replicaShard = newShard(primaryShard.shardId(), false);
updateMappings(replicaShard, primaryShard.indexSettings().getIndexMetaData());
recoverReplica(replicaShard, primaryShard);
recoverReplica(replicaShard, primaryShard, true);
List<IndexCommit> commits = DirectoryReader.listCommits(replicaShard.store().directory());
long maxSeqNo = Long.parseLong(commits.get(0).getUserData().get(SequenceNumbers.MAX_SEQ_NO));
assertThat(maxSeqNo, lessThanOrEqualTo(globalCheckpoint));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ public synchronized IndexShard addReplicaWithExistingPath(final ShardPath shardP
RecoverySource.PeerRecoverySource.INSTANCE);

final IndexShard newReplica =
newShard(shardRouting, shardPath, indexMetaData, null, getEngineFactory(shardRouting), () -> {}, EMPTY_EVENT_LISTENER);
newShard(shardRouting, shardPath, indexMetaData, null, getEngineFactory(shardRouting), () -> {}, EMPTY_EVENT_LISTENER);
replicas.add(newReplica);
updateAllocationIDsOnPrimary();
return newReplica;
Expand Down Expand Up @@ -341,8 +341,11 @@ public void recoverReplica(
IndexShard replica,
BiFunction<IndexShard, DiscoveryNode, RecoveryTarget> targetSupplier,
boolean markAsRecovering) throws IOException {
ESIndexLevelReplicationTestCase.this.recoverReplica(replica, primary, targetSupplier, markAsRecovering, activeIds(),
routingTable(Function.identity()));
final IndexShardRoutingTable routingTable = routingTable(Function.identity());
final Set<String> inSyncIds = activeIds();
ESIndexLevelReplicationTestCase.this.recoverUnstartedReplica(replica, primary, targetSupplier, markAsRecovering, inSyncIds,
routingTable);
ESIndexLevelReplicationTestCase.this.startReplicaAfterRecovery(replica, primary, inSyncIds, routingTable);
}

public synchronized DiscoveryNode getPrimaryNode() {
Expand Down
Loading