Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce global checkpoint background sync #26591

Merged
merged 42 commits into from
Sep 21, 2017
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
c0e7443
Introduce global checkpoint background sync
jasontedor Aug 28, 2017
511f96e
Cleanup after test
jasontedor Sep 11, 2017
e1dc814
Recollect stats
jasontedor Sep 12, 2017
86ddf79
Merge branch 'master' into global-checkpoint-sync
jasontedor Sep 19, 2017
e0657a7
Add after-op sync
jasontedor Sep 20, 2017
b79b13f
Merge branch 'master' into global-checkpoint-sync
jasontedor Sep 20, 2017
f7a76cd
Iteration
jasontedor Sep 20, 2017
3881d76
Iteration
jasontedor Sep 20, 2017
89bbf84
Remove comments
jasontedor Sep 20, 2017
1082a31
Close it
jasontedor Sep 20, 2017
15453b4
Merge branch 'master' into global-checkpoint-sync
jasontedor Sep 20, 2017
aa0c62c
refresh needed
jasontedor Sep 20, 2017
721f725
remove ensure green
jasontedor Sep 20, 2017
9bc5155
remove background sync test setting
jasontedor Sep 20, 2017
63e9d80
Revert "remove background sync test setting"
jasontedor Sep 21, 2017
6ade720
test iteratoin
jasontedor Sep 21, 2017
354df1c
disable bwc tests
jasontedor Sep 21, 2017
79742ea
imports
jasontedor Sep 21, 2017
d9fc19d
Formatting of method
jasontedor Sep 21, 2017
afb082f
Remove leftover code
jasontedor Sep 21, 2017
9bf875e
State handling
jasontedor Sep 21, 2017
007d5c4
Logging on failed sync
jasontedor Sep 21, 2017
cb9373b
no fallthrough
jasontedor Sep 21, 2017
7e6d1bf
Setting
jasontedor Sep 21, 2017
f7295ac
Critical fix
jasontedor Sep 21, 2017
f237d88
fix comment
jasontedor Sep 21, 2017
e88b92c
Revert move, add javadocs
jasontedor Sep 21, 2017
030156d
Handle execute future
jasontedor Sep 21, 2017
0343372
Only check in-sync shards
jasontedor Sep 21, 2017
cf4e67b
Test iteration
jasontedor Sep 21, 2017
f82df30
More testing
jasontedor Sep 21, 2017
507806b
refactor shard creation
jasontedor Sep 21, 2017
530addd
handle closed
jasontedor Sep 21, 2017
f3b04dc
assertSeqNos
jasontedor Sep 21, 2017
5030a77
Add distruption test
jasontedor Sep 21, 2017
b640b10
revert formatting changes
jasontedor Sep 21, 2017
cccdec6
checkstyle
jasontedor Sep 21, 2017
d43f794
imports
jasontedor Sep 21, 2017
26e4c76
remove dead code from create index
jasontedor Sep 21, 2017
b80b728
everything in the right place
jasontedor Sep 21, 2017
b8adcce
revert formatting changes
jasontedor Sep 21, 2017
c041ea2
Merge branch 'master' into global-checkpoint-sync
jasontedor Sep 21, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions core/src/main/java/org/elasticsearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -724,7 +724,7 @@ private void maybeTrimTranslog() {
}
}

private void syncGlobalCheckpoints() {
private void maybeSyncGlobalCheckpoints() {
for (final IndexShard shard : this.shards.values()) {
if (shard.routingEntry().active() && shard.routingEntry().primary()) {
switch (shard.state()) {
Expand Down Expand Up @@ -950,7 +950,7 @@ final class AsyncGlobalCheckpointTask extends BaseAsyncTask {

@Override
protected void runInternal() {
indexService.syncGlobalCheckpoints();
indexService.maybeSyncGlobalCheckpoints();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,10 @@ public void testAckedIndexing() throws Exception {
final List<String> nodes = startCluster(rarely() ? 5 : 3);

assertAcked(prepareCreate("test")
.setSettings(Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1 + randomInt(2))
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomInt(2))
));
.setSettings(Settings.builder()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IDE gone rogue in this file? :D

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I concurrently pushed b640b10. 😄

.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1 + randomInt(2))
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomInt(2))
));
ensureGreen();

ServiceDisruptionScheme disruptionScheme = addRandomDisruptionScheme();
Expand Down Expand Up @@ -142,8 +142,8 @@ public void testAckedIndexing() throws Exception {
exceptedExceptions.add(e);
final String docId = id;
logger.trace(
(Supplier<?>)
() -> new ParameterizedMessage("[{}] failed id [{}] through node [{}]", name, docId, node), e);
(Supplier<?>)
() -> new ParameterizedMessage("[{}] failed id [{}] through node [{}]", name, docId, node), e);
} finally {
countDownLatchRef.get().countDown();
logger.trace("[{}] decreased counter : {}", name, countDownLatchRef.get().getCount());
Expand Down Expand Up @@ -190,12 +190,12 @@ public void testAckedIndexing() throws Exception {
disruptionScheme.stopDisrupting();
for (String node : internalCluster().getNodeNames()) {
ensureStableCluster(nodes.size(), TimeValue.timeValueMillis(disruptionScheme.expectedTimeToHeal().millis() +
DISRUPTION_HEALING_OVERHEAD.millis()), true, node);
DISRUPTION_HEALING_OVERHEAD.millis()), true, node);
}
// in case of a bridge partition, shard allocation can fail "index.allocation.max_retries" times if the master
// is the super-connected node and recovery source and target are on opposite sides of the bridge
if (disruptionScheme instanceof NetworkDisruption &&
((NetworkDisruption) disruptionScheme).getDisruptedLinks() instanceof Bridge) {
((NetworkDisruption) disruptionScheme).getDisruptedLinks() instanceof Bridge) {
assertAcked(client().admin().cluster().prepareReroute().setRetryFailed(true));
}
ensureGreen("test");
Expand All @@ -207,14 +207,16 @@ public void testAckedIndexing() throws Exception {
logger.debug("validating through node [{}] ([{}] acked docs)", node, ackedDocs.size());
for (String id : ackedDocs.keySet()) {
assertTrue("doc [" + id + "] indexed via node [" + ackedDocs.get(id) + "] not found",
client(node).prepareGet("test", "type", id).setPreference("_local").get().isExists());
client(node).prepareGet("test", "type", id).setPreference("_local").get().isExists());
}
} catch (AssertionError | NoShardAvailableActionException e) {
throw new AssertionError(e.getMessage() + " (checked via node [" + node + "]", e);
}
}
}, 30, TimeUnit.SECONDS);

assertSeqNos();

logger.info("done validating (iteration [{}])", iter);
}
} finally {
Expand Down
47 changes: 1 addition & 46 deletions core/src/test/java/org/elasticsearch/recovery/RelocationIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,16 @@
package org.elasticsearch.recovery;

import com.carrotsearch.hppc.IntHashSet;
import com.carrotsearch.hppc.ObjectLongMap;
import com.carrotsearch.hppc.procedures.IntProcedure;
import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.util.English;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.indices.stats.IndexShardStats;
import org.elasticsearch.action.admin.indices.stats.IndexStats;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
Expand All @@ -47,13 +41,10 @@
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.recovery.PeerRecoveryTargetService;
import org.elasticsearch.indices.recovery.RecoveryFileChunkRequest;
import org.elasticsearch.plugins.Plugin;
Expand Down Expand Up @@ -82,7 +73,6 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
Expand Down Expand Up @@ -111,42 +101,7 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
@Override
protected void beforeIndexDeletion() throws Exception {
super.beforeIndexDeletion();
assertBusy(() -> {
IndicesStatsResponse stats = client().admin().indices().prepareStats().clear().get();
for (IndexStats indexStats : stats.getIndices().values()) {
for (IndexShardStats indexShardStats : indexStats.getIndexShards().values()) {
Optional<ShardStats> maybePrimary = Stream.of(indexShardStats.getShards())
.filter(s -> s.getShardRouting().active() && s.getShardRouting().primary())
.findFirst();
if (maybePrimary.isPresent() == false) {
continue;
}
ShardStats primary = maybePrimary.get();
final SeqNoStats primarySeqNoStats = primary.getSeqNoStats();
final ShardRouting primaryShardRouting = primary.getShardRouting();
assertThat(primaryShardRouting + " should have set the global checkpoint",
primarySeqNoStats.getGlobalCheckpoint(), not(equalTo(SequenceNumbers.UNASSIGNED_SEQ_NO)));
final DiscoveryNode node = clusterService().state().nodes().get(primaryShardRouting.currentNodeId());
final IndicesService indicesService =
internalCluster().getInstance(IndicesService.class, node.getName());
final IndexShard indexShard = indicesService.getShardOrNull(primaryShardRouting.shardId());
final ObjectLongMap<String> globalCheckpoints = indexShard.getInSyncGlobalCheckpoints();
for (ShardStats shardStats : indexShardStats) {
final SeqNoStats seqNoStats = shardStats.getSeqNoStats();
assertThat(shardStats.getShardRouting() + " local checkpoint mismatch",
seqNoStats.getLocalCheckpoint(), equalTo(primarySeqNoStats.getLocalCheckpoint()));
assertThat(shardStats.getShardRouting() + " global checkpoint mismatch",
seqNoStats.getGlobalCheckpoint(), equalTo(primarySeqNoStats.getGlobalCheckpoint()));
assertThat(shardStats.getShardRouting() + " max seq no mismatch",
seqNoStats.getMaxSeqNo(), equalTo(primarySeqNoStats.getMaxSeqNo()));
// the local knowledge on the primary of the global checkpoint equals the global checkpoint on the shard
assertThat(
seqNoStats.getGlobalCheckpoint(),
equalTo(globalCheckpoints.get(shardStats.getShardRouting().allocationId().getId())));
}
}
}
});
assertSeqNos();
}

public void testSimpleRelocationNoIndexing() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.test;

import com.carrotsearch.hppc.ObjectLongMap;
import com.carrotsearch.randomizedtesting.RandomizedContext;
import com.carrotsearch.randomizedtesting.annotations.TestGroup;
import com.carrotsearch.randomizedtesting.generators.RandomNumbers;
Expand Down Expand Up @@ -49,6 +50,10 @@
import org.elasticsearch.action.admin.indices.segments.IndexShardSegments;
import org.elasticsearch.action.admin.indices.segments.IndicesSegmentResponse;
import org.elasticsearch.action.admin.indices.segments.ShardSegments;
import org.elasticsearch.action.admin.indices.stats.IndexShardStats;
import org.elasticsearch.action.admin.indices.stats.IndexStats;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequestBuilder;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
Expand All @@ -69,6 +74,7 @@
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
Expand Down Expand Up @@ -114,6 +120,9 @@
import org.elasticsearch.index.codec.CodecService;
import org.elasticsearch.index.engine.Segment;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.IndicesQueryCache;
import org.elasticsearch.indices.IndicesRequestCache;
Expand Down Expand Up @@ -161,6 +170,7 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Callable;
Expand Down Expand Up @@ -191,6 +201,7 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.startsWith;

Expand Down Expand Up @@ -2194,4 +2205,44 @@ public static Index resolveIndex(String index) {
String uuid = getIndexResponse.getSettings().get(index).get(IndexMetaData.SETTING_INDEX_UUID);
return new Index(index, uuid);
}

protected void assertSeqNos() throws Exception {
assertBusy(() -> {
IndicesStatsResponse stats = client().admin().indices().prepareStats().clear().get();
for (IndexStats indexStats : stats.getIndices().values()) {
for (IndexShardStats indexShardStats : indexStats.getIndexShards().values()) {
Optional<ShardStats> maybePrimary = Stream.of(indexShardStats.getShards())
.filter(s -> s.getShardRouting().active() && s.getShardRouting().primary())
.findFirst();
if (maybePrimary.isPresent() == false) {
continue;
}
ShardStats primary = maybePrimary.get();
final SeqNoStats primarySeqNoStats = primary.getSeqNoStats();
final ShardRouting primaryShardRouting = primary.getShardRouting();
assertThat(primaryShardRouting + " should have set the global checkpoint",
primarySeqNoStats.getGlobalCheckpoint(), not(equalTo(SequenceNumbers.UNASSIGNED_SEQ_NO)));
final DiscoveryNode node = clusterService().state().nodes().get(primaryShardRouting.currentNodeId());
final IndicesService indicesService =
internalCluster().getInstance(IndicesService.class, node.getName());
final IndexShard indexShard = indicesService.getShardOrNull(primaryShardRouting.shardId());
final ObjectLongMap<String> globalCheckpoints = indexShard.getInSyncGlobalCheckpoints();
for (ShardStats shardStats : indexShardStats) {
final SeqNoStats seqNoStats = shardStats.getSeqNoStats();
assertThat(shardStats.getShardRouting() + " local checkpoint mismatch",
seqNoStats.getLocalCheckpoint(), equalTo(primarySeqNoStats.getLocalCheckpoint()));
assertThat(shardStats.getShardRouting() + " global checkpoint mismatch",
seqNoStats.getGlobalCheckpoint(), equalTo(primarySeqNoStats.getGlobalCheckpoint()));
assertThat(shardStats.getShardRouting() + " max seq no mismatch",
seqNoStats.getMaxSeqNo(), equalTo(primarySeqNoStats.getMaxSeqNo()));
// the local knowledge on the primary of the global checkpoint equals the global checkpoint on the shard
assertThat(
seqNoStats.getGlobalCheckpoint(),
equalTo(globalCheckpoints.get(shardStats.getShardRouting().allocationId().getId())));
}
}
}
});
}

}