Skip to content

Commit

Permalink
Fix concurrent migrate tests
Browse files Browse the repository at this point in the history
I was using a CountDownLatch like a CyclicBarrier....
  • Loading branch information
nik9000 committed Aug 16, 2016
1 parent b975683 commit 47271a8
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.admin.indices.migrate.TransportMigrateIndexAction.DocumentMigrater;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.Plugin;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public void testMigrateNonEmptyIndex() throws InterruptedException, ExecutionExc
Script script = new Script("ctx._source.foo += ' cat'", ScriptType.INLINE, CustomScriptPlugin.NAME, emptyMap());
migrateIndexTestCase(docCount, script);

// And now the script is applied to the alias! Huzzah!
// the migration applied the script to the documents
SearchResponse searchResponse = client().prepareSearch("test").setSize(0).setQuery(matchQuery("foo", "cat")).get();
assertHitCount(searchResponse, docCount);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

package org.elasticsearch.index;

import org.elasticsearch.action.ListenableActionFuture;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.action.admin.indices.migrate.MigrateIndexRequestBuilder;
import org.elasticsearch.action.admin.indices.migrate.MigrateIndexResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
Expand All @@ -30,11 +30,9 @@

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

Expand Down Expand Up @@ -101,29 +99,35 @@ protected void migrateIndexTestCase(int docCount, Script script) throws Interrup
* to be able to consistently use this API on startup in all nodes.*/
MigrateIndexRequestBuilder migrate = client().admin().indices().prepareMigrateIndex("test_2", "test_3")
.setAliases("test").setScript(script);
int latchSize = 10;
CountDownLatch latch = new CountDownLatch(latchSize);
ExecutorService executor = Executors.newFixedThreadPool(between(2, Runtime.getRuntime().availableProcessors()));
try {
int totalRequests = between(1, 100) * latchSize;
List<Future<ListenableActionFuture<MigrateIndexResponse>>> tasks = new ArrayList<>(totalRequests);
for (int i = 0; i < totalRequests; i++) {
tasks.add(executor.submit(() -> {
int concurrentRequests = between(2, Runtime.getRuntime().availableProcessors());
CyclicBarrier latch = new CyclicBarrier(concurrentRequests);
int requestsPerThread = between(5, 100);
List<Thread> threads = new ArrayList<>();
for (int i = 0; i < concurrentRequests; i++) {
Thread t = new Thread(() ->{
for (int r = 0; r < requestsPerThread; r++) {
try {
latch.await();
} catch (InterruptedException e) {
} catch (InterruptedException | BrokenBarrierException e) {
throw new RuntimeException(e);
}
return migrate.execute();
}));
}
for (Future<ListenableActionFuture<MigrateIndexResponse>> task : tasks) {
MigrateIndexResponse response = task.get(20, TimeUnit.SECONDS).get();
assertTrue(response.isAcknowledged());
MigrateIndexResponse response = migrate.get();
assertTrue(response.isAcknowledged());
assertFalse(client().admin().indices().prepareExists("test_2").get().isExists());
assertTrue(client().admin().indices().prepareExists("test_3").get().isExists());
}
});
t.setName(getTestName() + "#" + i);
threads.add(t);
t.start();
}
try {
for (Thread thread : threads) {
thread.join(TimeUnit.SECONDS.toMillis(30));
}
} finally {
executor.shutdown();
} catch (InterruptedException e) {
ListTasksResponse listTasks = client().admin().cluster().prepareListTasks().get();
throw new RuntimeException("Failed while waiting for migrations. These are the running tasks: " + listTasks, e);
}

}
}

0 comments on commit 47271a8

Please sign in to comment.