Skip to content

Commit

Permalink
Add distruption test
Browse files Browse the repository at this point in the history
  • Loading branch information
jasontedor committed Sep 21, 2017
1 parent f3b04dc commit 5030a77
Showing 1 changed file with 95 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,39 @@
import org.elasticsearch.action.admin.indices.stats.IndexStats;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalSettingsPlugin;
import org.elasticsearch.test.disruption.NetworkDisruption;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
Expand All @@ -48,17 +66,89 @@ public class GlobalCheckpointSyncIT extends ESIntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Stream.concat(super.nodePlugins().stream(), Stream.of(InternalSettingsPlugin.class)).collect(Collectors.toList());
return Stream.concat(super.nodePlugins().stream(), Stream.of(InternalSettingsPlugin.class, MockTransportService.TestPlugin.class)).collect(Collectors.toList());
}

public void testPostOperationGlobalCheckpointSync() throws Exception {
internalCluster().startNode();
runGlobalCheckpointSyncTest(TimeValue.timeValueHours(24), client -> {}, client -> {});
}

/*
* This test swallows the post-operation global checkpoint syncs, and then restores the ability to send these requests at the end of the
* test so that a background sync can fire and sync the global checkpoint.
*/
public void testBackgroundGlobalCheckpointSync() throws Exception {
runGlobalCheckpointSyncTest(
TimeValue.timeValueSeconds(randomIntBetween(1, 3)),
client -> {
// prevent global checkpoint syncs between all nodes
final DiscoveryNodes nodes = client.admin().cluster().prepareState().get().getState().getNodes();
for (final DiscoveryNode node : nodes) {
for (final DiscoveryNode other : nodes) {
if (node == other) {
continue;
}
final MockTransportService senderTransportService =
(MockTransportService) internalCluster().getInstance(TransportService.class, node.getName());
final MockTransportService receiverTransportService =
(MockTransportService) internalCluster().getInstance(TransportService.class, other.getName());

senderTransportService.addDelegate(receiverTransportService,
new MockTransportService.DelegateTransport(senderTransportService.original()) {
@Override
protected void sendRequest(
final Connection connection,
final long requestId,
final String action,
final TransportRequest request,
final TransportRequestOptions options) throws IOException {
if ("indices:admin/seq_no/global_checkpoint_sync[r]".equals(action)) {
throw new IllegalStateException("blocking indices:admin/seq_no/global_checkpoint_sync[r]");
} else {
super.sendRequest(connection, requestId, action, request, options);
}
}
});
}
}
},
client -> {
// restore global checkpoint syncs between all nodes
final DiscoveryNodes nodes = client.admin().cluster().prepareState().get().getState().getNodes();
for (final DiscoveryNode node : nodes) {
for (final DiscoveryNode other : nodes) {
if (node == other) {
continue;
}
final MockTransportService senderTransportService =
(MockTransportService) internalCluster().getInstance(TransportService.class, node.getName());
final MockTransportService receiverTransportService =
(MockTransportService) internalCluster().getInstance(TransportService.class, other.getName());
senderTransportService.clearRule(receiverTransportService);
}
}
});
}

private void runGlobalCheckpointSyncTest(
final TimeValue globalCheckpointSyncInterval,
final Consumer<Client> beforeIndexing,
final Consumer<Client> afterIndexing) throws Exception {
final int numberOfReplicas = randomIntBetween(1, 4);
internalCluster().ensureAtLeastNumDataNodes(1 + numberOfReplicas);
// set the sync interval high so it does not execute during this test
prepareCreate("test", Settings.builder().put(IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.getKey(), "24h")).get();
prepareCreate(
"test",
Settings.builder()
.put(IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.getKey(), globalCheckpointSyncInterval)
.put("index.number_of_replicas", numberOfReplicas))
.get();
if (randomBoolean()) {
ensureGreen();
}

beforeIndexing.accept(client());

final int numberOfDocuments = randomIntBetween(0, 256);

final int numberOfThreads = randomIntBetween(1, 4);
Expand Down Expand Up @@ -94,6 +184,8 @@ public void testPostOperationGlobalCheckpointSync() throws Exception {
// wait for the threads to finish
barrier.await();

afterIndexing.accept(client());

assertBusy(() -> {
final IndicesStatsResponse stats = client().admin().indices().prepareStats().clear().get();
final IndexStats indexStats = stats.getIndex("test");
Expand Down

0 comments on commit 5030a77

Please sign in to comment.