Skip to content

Commit

Permalink
TEST: Access cluster state directly in assertSeqNos (#33277)
Browse files Browse the repository at this point in the history
Some AbstractDisruptionTestCase tests start failing since we enabled
assertSeqNos (in #33130). They fail because the assertSeqNos assertion
queries cluster stats while the cluster is disrupted or not formed yet.

This commit switches to use the cluster state and shard stats directly
from the test cluster.

Closes #33251
  • Loading branch information
dnhatn committed Sep 4, 2018
1 parent bfa9199 commit b85cbd1
Showing 1 changed file with 40 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@
package org.elasticsearch.test;

import com.carrotsearch.hppc.ObjectLongMap;
import com.carrotsearch.hppc.cursors.IntObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import com.carrotsearch.randomizedtesting.RandomizedContext;
import com.carrotsearch.randomizedtesting.annotations.TestGroup;
import com.carrotsearch.randomizedtesting.generators.RandomNumbers;
import com.carrotsearch.randomizedtesting.generators.RandomPicks;
import org.apache.http.HttpHost;
import org.apache.lucene.search.Sort;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
Expand All @@ -48,10 +51,6 @@
import org.elasticsearch.action.admin.indices.segments.IndexShardSegments;
import org.elasticsearch.action.admin.indices.segments.IndicesSegmentResponse;
import org.elasticsearch.action.admin.indices.segments.ShardSegments;
import org.elasticsearch.action.admin.indices.stats.IndexShardStats;
import org.elasticsearch.action.admin.indices.stats.IndexStats;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequestBuilder;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
Expand Down Expand Up @@ -187,7 +186,6 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Callable;
Expand Down Expand Up @@ -2329,40 +2327,48 @@ public static Index resolveIndex(String index) {

protected void assertSeqNos() throws Exception {
assertBusy(() -> {
IndicesStatsResponse stats = client().admin().indices().prepareStats().clear().get();
for (IndexStats indexStats : stats.getIndices().values()) {
for (IndexShardStats indexShardStats : indexStats.getIndexShards().values()) {
Optional<ShardStats> maybePrimary = Stream.of(indexShardStats.getShards())
.filter(s -> s.getShardRouting().active() && s.getShardRouting().primary())
.findFirst();
if (maybePrimary.isPresent() == false) {
final ClusterState state = clusterService().state();
for (ObjectObjectCursor<String, IndexRoutingTable> indexRoutingTable : state.routingTable().indicesRouting()) {
for (IntObjectCursor<IndexShardRoutingTable> indexShardRoutingTable : indexRoutingTable.value.shards()) {
ShardRouting primaryShardRouting = indexShardRoutingTable.value.primaryShard();
if (primaryShardRouting == null || primaryShardRouting.assignedToNode() == false) {
continue;
}
ShardStats primary = maybePrimary.get();
final SeqNoStats primarySeqNoStats = primary.getSeqNoStats();
final ShardRouting primaryShardRouting = primary.getShardRouting();
DiscoveryNode primaryNode = state.nodes().get(primaryShardRouting.currentNodeId());
IndexShard primaryShard = internalCluster().getInstance(IndicesService.class, primaryNode.getName())
.indexServiceSafe(primaryShardRouting.index()).getShard(primaryShardRouting.id());
final SeqNoStats primarySeqNoStats;
final ObjectLongMap<String> syncGlobalCheckpoints;
try {
primarySeqNoStats = primaryShard.seqNoStats();
syncGlobalCheckpoints = primaryShard.getInSyncGlobalCheckpoints();
} catch (AlreadyClosedException ex) {
continue; // shard is closed - just ignore
}
assertThat(primaryShardRouting + " should have set the global checkpoint",
primarySeqNoStats.getGlobalCheckpoint(), not(equalTo(SequenceNumbers.UNASSIGNED_SEQ_NO)));
final DiscoveryNode node = clusterService().state().nodes().get(primaryShardRouting.currentNodeId());
final IndicesService indicesService =
internalCluster().getInstance(IndicesService.class, node.getName());
final IndexShard indexShard = indicesService.getShardOrNull(primaryShardRouting.shardId());
final ObjectLongMap<String> globalCheckpoints = indexShard.getInSyncGlobalCheckpoints();
for (ShardStats shardStats : indexShardStats) {
final SeqNoStats seqNoStats = shardStats.getSeqNoStats();
if (seqNoStats == null) {
continue; // this shard was closed
primarySeqNoStats.getGlobalCheckpoint(), not(equalTo(SequenceNumbers.UNASSIGNED_SEQ_NO)));
for (ShardRouting replicaShardRouting : indexShardRoutingTable.value.replicaShards()) {
if (replicaShardRouting.assignedToNode() == false) {
continue;
}
DiscoveryNode replicaNode = state.nodes().get(replicaShardRouting.currentNodeId());
IndexShard replicaShard = internalCluster().getInstance(IndicesService.class, replicaNode.getName())
.indexServiceSafe(replicaShardRouting.index()).getShard(replicaShardRouting.id());
final SeqNoStats seqNoStats;
try {
seqNoStats = replicaShard.seqNoStats();
} catch (AlreadyClosedException e) {
continue; // shard is closed - just ignore
}
assertThat(shardStats.getShardRouting() + " local checkpoint mismatch",
seqNoStats.getLocalCheckpoint(), equalTo(primarySeqNoStats.getLocalCheckpoint()));
assertThat(shardStats.getShardRouting() + " global checkpoint mismatch",
seqNoStats.getGlobalCheckpoint(), equalTo(primarySeqNoStats.getGlobalCheckpoint()));
assertThat(shardStats.getShardRouting() + " max seq no mismatch",
seqNoStats.getMaxSeqNo(), equalTo(primarySeqNoStats.getMaxSeqNo()));
assertThat(replicaShardRouting + " local checkpoint mismatch",
seqNoStats.getLocalCheckpoint(), equalTo(primarySeqNoStats.getLocalCheckpoint()));
assertThat(replicaShardRouting + " global checkpoint mismatch",
seqNoStats.getGlobalCheckpoint(), equalTo(primarySeqNoStats.getGlobalCheckpoint()));
assertThat(replicaShardRouting + " max seq no mismatch",
seqNoStats.getMaxSeqNo(), equalTo(primarySeqNoStats.getMaxSeqNo()));
// the local knowledge on the primary of the global checkpoint equals the global checkpoint on the shard
assertThat(
seqNoStats.getGlobalCheckpoint(),
equalTo(globalCheckpoints.get(shardStats.getShardRouting().allocationId().getId())));
assertThat(replicaShardRouting + " global checkpoint syncs mismatch", seqNoStats.getGlobalCheckpoint(),
equalTo(syncGlobalCheckpoints.get(replicaShardRouting.allocationId().getId())));
}
}
}
Expand Down

0 comments on commit b85cbd1

Please sign in to comment.