From 5030a776cfd859e89522cfd9c087c59afdee4b22 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Thu, 21 Sep 2017 12:53:39 -0400 Subject: [PATCH] Add distruption test --- .../index/seqno/GlobalCheckpointSyncIT.java | 98 ++++++++++++++++++- 1 file changed, 95 insertions(+), 3 deletions(-) diff --git a/core/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncIT.java b/core/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncIT.java index 406fecfc47f32..b52161a73e16f 100644 --- a/core/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncIT.java +++ b/core/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncIT.java @@ -23,21 +23,39 @@ import org.elasticsearch.action.admin.indices.stats.IndexStats; import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; import org.elasticsearch.action.admin.indices.stats.ShardStats; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.GroupShardsIterator; +import org.elasticsearch.cluster.routing.ShardIterator; +import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.InternalSettingsPlugin; +import org.elasticsearch.test.disruption.NetworkDisruption; +import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.transport.Transport; +import org.elasticsearch.transport.TransportRequest; +import org.elasticsearch.transport.TransportRequestOptions; +import org.elasticsearch.transport.TransportService; +import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.Iterator; import java.util.List; import java.util.Optional; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; +import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.Stream; import java.util.stream.StreamSupport; @@ -48,17 +66,89 @@ public class GlobalCheckpointSyncIT extends ESIntegTestCase { @Override protected Collection> nodePlugins() { - return Stream.concat(super.nodePlugins().stream(), Stream.of(InternalSettingsPlugin.class)).collect(Collectors.toList()); + return Stream.concat(super.nodePlugins().stream(), Stream.of(InternalSettingsPlugin.class, MockTransportService.TestPlugin.class)).collect(Collectors.toList()); } public void testPostOperationGlobalCheckpointSync() throws Exception { - internalCluster().startNode(); + runGlobalCheckpointSyncTest(TimeValue.timeValueHours(24), client -> {}, client -> {}); + } + + /* + * This test swallows the post-operation global checkpoint syncs, and then restores the ability to send these requests at the end of the + * test so that a background sync can fire and sync the global checkpoint. + */ + public void testBackgroundGlobalCheckpointSync() throws Exception { + runGlobalCheckpointSyncTest( + TimeValue.timeValueSeconds(randomIntBetween(1, 3)), + client -> { + // prevent global checkpoint syncs between all nodes + final DiscoveryNodes nodes = client.admin().cluster().prepareState().get().getState().getNodes(); + for (final DiscoveryNode node : nodes) { + for (final DiscoveryNode other : nodes) { + if (node == other) { + continue; + } + final MockTransportService senderTransportService = + (MockTransportService) internalCluster().getInstance(TransportService.class, node.getName()); + final MockTransportService receiverTransportService = + (MockTransportService) internalCluster().getInstance(TransportService.class, other.getName()); + + senderTransportService.addDelegate(receiverTransportService, + new MockTransportService.DelegateTransport(senderTransportService.original()) { + @Override + protected void sendRequest( + final Connection connection, + final long requestId, + final String action, + final TransportRequest request, + final TransportRequestOptions options) throws IOException { + if ("indices:admin/seq_no/global_checkpoint_sync[r]".equals(action)) { + throw new IllegalStateException("blocking indices:admin/seq_no/global_checkpoint_sync[r]"); + } else { + super.sendRequest(connection, requestId, action, request, options); + } + } + }); + } + } + }, + client -> { + // restore global checkpoint syncs between all nodes + final DiscoveryNodes nodes = client.admin().cluster().prepareState().get().getState().getNodes(); + for (final DiscoveryNode node : nodes) { + for (final DiscoveryNode other : nodes) { + if (node == other) { + continue; + } + final MockTransportService senderTransportService = + (MockTransportService) internalCluster().getInstance(TransportService.class, node.getName()); + final MockTransportService receiverTransportService = + (MockTransportService) internalCluster().getInstance(TransportService.class, other.getName()); + senderTransportService.clearRule(receiverTransportService); + } + } + }); + } + + private void runGlobalCheckpointSyncTest( + final TimeValue globalCheckpointSyncInterval, + final Consumer beforeIndexing, + final Consumer afterIndexing) throws Exception { + final int numberOfReplicas = randomIntBetween(1, 4); + internalCluster().ensureAtLeastNumDataNodes(1 + numberOfReplicas); // set the sync interval high so it does not execute during this test - prepareCreate("test", Settings.builder().put(IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.getKey(), "24h")).get(); + prepareCreate( + "test", + Settings.builder() + .put(IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.getKey(), globalCheckpointSyncInterval) + .put("index.number_of_replicas", numberOfReplicas)) + .get(); if (randomBoolean()) { ensureGreen(); } + beforeIndexing.accept(client()); + final int numberOfDocuments = randomIntBetween(0, 256); final int numberOfThreads = randomIntBetween(1, 4); @@ -94,6 +184,8 @@ public void testPostOperationGlobalCheckpointSync() throws Exception { // wait for the threads to finish barrier.await(); + afterIndexing.accept(client()); + assertBusy(() -> { final IndicesStatsResponse stats = client().admin().indices().prepareStats().clear().get(); final IndexStats indexStats = stats.getIndex("test");