Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ignore shard started requests when primary term does not match #37899

Merged
merged 5 commits into from
Jan 29, 2019
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -494,12 +494,20 @@ public int hashCode() {
}
}

public void shardStarted(final ShardRouting shardRouting, final String message, Listener listener) {
shardStarted(shardRouting, message, listener, clusterService.state());
public void shardStarted(final ShardRouting shardRouting,
final long primaryTerm,
final String message,
final Listener listener) {
shardStarted(shardRouting, primaryTerm, message, listener, clusterService.state());
}
public void shardStarted(final ShardRouting shardRouting, final String message, Listener listener, ClusterState currentState) {
StartedShardEntry shardEntry = new StartedShardEntry(shardRouting.shardId(), shardRouting.allocationId().getId(), message);
sendShardAction(SHARD_STARTED_ACTION_NAME, currentState, shardEntry, listener);

public void shardStarted(final ShardRouting shardRouting,
final long primaryTerm,
final String message,
final Listener listener,
final ClusterState currentState) {
StartedShardEntry entry = new StartedShardEntry(shardRouting.shardId(), shardRouting.allocationId().getId(), primaryTerm, message);
sendShardAction(SHARD_STARTED_ACTION_NAME, currentState, entry, listener);
}

private static class ShardStartedTransportHandler implements TransportRequestHandler<StartedShardEntry> {
Expand Down Expand Up @@ -544,7 +552,7 @@ public ClusterTasksResult<StartedShardEntry> execute(ClusterState currentState,
List<ShardRouting> shardRoutingsToBeApplied = new ArrayList<>(tasks.size());
Set<ShardRouting> seenShardRoutings = new HashSet<>(); // to prevent duplicates
for (StartedShardEntry task : tasks) {
ShardRouting matched = currentState.getRoutingTable().getByAllocationId(task.shardId, task.allocationId);
final ShardRouting matched = currentState.getRoutingTable().getByAllocationId(task.shardId, task.allocationId);
if (matched == null) {
// tasks that correspond to non-existent shards are marked as successful. The reason is that we resend shard started
// events on every cluster state publishing that does not contain the shard as started yet. This means that old stale
Expand All @@ -553,6 +561,19 @@ public ClusterTasksResult<StartedShardEntry> execute(ClusterState currentState,
logger.debug("{} ignoring shard started task [{}] (shard does not exist anymore)", task.shardId, task);
builder.success(task);
} else {
if (matched.primary() && task.primaryTerm > 0) {
final IndexMetaData indexMetaData = currentState.metaData().index(task.shardId.getIndex());
assert indexMetaData != null;
final long currentPrimaryTerm = indexMetaData.primaryTerm(task.shardId.id());
if (currentPrimaryTerm != task.primaryTerm) {
assert currentPrimaryTerm > task.primaryTerm : "received a primary term with a higher term than in the " +
"current cluster state (received [" + task.primaryTerm + "] but current is [" + currentPrimaryTerm + "])";
logger.debug("{} ignoring shard started task [{}] (primary term {} does not match current term {})",
task.shardId, task, task.primaryTerm, currentPrimaryTerm);
builder.success(task);
continue;
}
}
if (matched.initializing() == false) {
assert matched.active() : "expected active shard routing for task " + task + " but found " + matched;
// same as above, this might have been a stale in-flight request, so we just ignore.
Expand Down Expand Up @@ -597,15 +618,20 @@ public void onFailure(String source, Exception e) {
public static class StartedShardEntry extends TransportRequest {
final ShardId shardId;
final String allocationId;
final long primaryTerm;
final String message;

StartedShardEntry(StreamInput in) throws IOException {
super(in);
shardId = ShardId.readShardId(in);
allocationId = in.readString();
if (in.getVersion().before(Version.V_6_3_0)) {
final long primaryTerm = in.readVLong();
primaryTerm = in.readVLong();
assert primaryTerm == UNASSIGNED_PRIMARY_TERM : "shard is only started by itself: primary term [" + primaryTerm + "]";
} else if (in.getVersion().onOrAfter(Version.V_7_0_0)) { // TODO update version to 6.7.0 after backport
primaryTerm = in.readVLong();
} else {
primaryTerm = UNASSIGNED_PRIMARY_TERM;
}
this.message = in.readString();
if (in.getVersion().before(Version.V_6_3_0)) {
Expand All @@ -614,9 +640,10 @@ public static class StartedShardEntry extends TransportRequest {
}
}

public StartedShardEntry(ShardId shardId, String allocationId, String message) {
public StartedShardEntry(final ShardId shardId, final String allocationId, final long primaryTerm, final String message) {
this.shardId = shardId;
this.allocationId = allocationId;
this.primaryTerm = primaryTerm;
this.message = message;
}

Expand All @@ -627,6 +654,8 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeString(allocationId);
if (out.getVersion().before(Version.V_6_3_0)) {
out.writeVLong(0L);
} else if (out.getVersion().onOrAfter(Version.V_7_0_0)) { // TODO update version to 6.7.0 after backport
out.writeVLong(primaryTerm);
}
out.writeString(message);
if (out.getVersion().before(Version.V_6_3_0)) {
Expand All @@ -636,8 +665,8 @@ public void writeTo(StreamOutput out) throws IOException {

@Override
public String toString() {
return String.format(Locale.ROOT, "StartedShardEntry{shardId [%s], allocationId [%s], message [%s]}",
shardId, allocationId, message);
return String.format(Locale.ROOT, "StartedShardEntry{shardId [%s], allocationId [%s], primary term [%d], message [%s]}",
shardId, allocationId, primaryTerm, message);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -575,13 +575,14 @@ private void createShard(DiscoveryNodes nodes, RoutingTable routingTable, ShardR
}

try {
logger.debug("{} creating shard", shardRouting.shardId());
final long primaryTerm = state.metaData().index(shardRouting.index()).primaryTerm(shardRouting.id());
logger.debug("{} creating shard with primary term [{}]", shardRouting.shardId(), primaryTerm);
RecoveryState recoveryState = new RecoveryState(shardRouting, nodes.getLocalNode(), sourceNode);
indicesService.createShard(
shardRouting,
recoveryState,
recoveryTargetService,
new RecoveryListener(shardRouting),
new RecoveryListener(shardRouting, primaryTerm),
repositoriesService,
failedShardHandler,
globalCheckpointSyncer,
Expand All @@ -598,9 +599,10 @@ private void updateShard(DiscoveryNodes nodes, ShardRouting shardRouting, Shard
"local shard has a different allocation id but wasn't cleaning by removeShards. "
+ "cluster state: " + shardRouting + " local: " + currentRoutingEntry;

final long primaryTerm;
try {
final IndexMetaData indexMetaData = clusterState.metaData().index(shard.shardId().getIndex());
final long primaryTerm = indexMetaData.primaryTerm(shard.shardId().id());
primaryTerm = indexMetaData.primaryTerm(shard.shardId().id());
final Set<String> inSyncIds = indexMetaData.inSyncAllocationIds(shard.shardId().id());
final IndexShardRoutingTable indexShardRoutingTable = routingTable.shardRoutingTable(shardRouting.shardId());
final Set<String> pre60AllocationIds = indexShardRoutingTable.assignedShards()
Expand Down Expand Up @@ -633,7 +635,7 @@ private void updateShard(DiscoveryNodes nodes, ShardRouting shardRouting, Shard
shardRouting.shardId(), state, nodes.getMasterNode());
}
if (nodes.getMasterNode() != null) {
shardStateAction.shardStarted(shardRouting, "master " + nodes.getMasterNode() +
shardStateAction.shardStarted(shardRouting, primaryTerm, "master " + nodes.getMasterNode() +
" marked shard as initializing, but shard state is [" + state + "], mark shard as started",
SHARD_STATE_ACTION_LISTENER, clusterState);
}
Expand Down Expand Up @@ -673,15 +675,24 @@ private static DiscoveryNode findSourceNodeForPeerRecovery(Logger logger, Routin

private class RecoveryListener implements PeerRecoveryTargetService.RecoveryListener {

/**
* ShardRouting with which the shard was created
*/
private final ShardRouting shardRouting;

private RecoveryListener(ShardRouting shardRouting) {
/**
* Primary term with which the shard was created
*/
private final long primaryTerm;
tlrx marked this conversation as resolved.
Show resolved Hide resolved

private RecoveryListener(final ShardRouting shardRouting, final long primaryTerm) {
this.shardRouting = shardRouting;
this.primaryTerm = primaryTerm;
}

@Override
public void onRecoveryDone(RecoveryState state) {
shardStateAction.shardStarted(shardRouting, "after " + state.getRecoverySource(), SHARD_STATE_ACTION_LISTENER);
public void onRecoveryDone(final RecoveryState state) {
shardStateAction.shardStarted(shardRouting, primaryTerm, "after " + state.getRecoverySource(), SHARD_STATE_ACTION_LISTENER);
}

@Override
Expand Down
Loading