diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/ReindexPlugin.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/ReindexPlugin.java index d51541d4e0880..51919883b11d5 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/ReindexPlugin.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/ReindexPlugin.java @@ -23,7 +23,6 @@ import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.admin.indices.migrate.TransportMigrateIndexAction.DocumentMigrater; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; -import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.plugins.ActionPlugin; import org.elasticsearch.plugins.Plugin; diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/MigrateIndexWithReindexTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/MigrateIndexWithReindexTests.java index 299633c434031..06a0c68f9c3c4 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/MigrateIndexWithReindexTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/MigrateIndexWithReindexTests.java @@ -53,7 +53,7 @@ public void testMigrateNonEmptyIndex() throws InterruptedException, ExecutionExc Script script = new Script("ctx._source.foo += ' cat'", ScriptType.INLINE, CustomScriptPlugin.NAME, emptyMap()); migrateIndexTestCase(docCount, script); - // And now the script is applied to the alias! Huzzah! + // the migration applied the script to the documents SearchResponse searchResponse = client().prepareSearch("test").setSize(0).setQuery(matchQuery("foo", "cat")).get(); assertHitCount(searchResponse, docCount); } diff --git a/test/framework/src/main/java/org/elasticsearch/index/MigrateIndexTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/MigrateIndexTestCase.java index 50863d7d2821a..34510db3a07c1 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/MigrateIndexTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/MigrateIndexTestCase.java @@ -19,7 +19,7 @@ package org.elasticsearch.index; -import org.elasticsearch.action.ListenableActionFuture; +import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; import org.elasticsearch.action.admin.indices.migrate.MigrateIndexRequestBuilder; import org.elasticsearch.action.admin.indices.migrate.MigrateIndexResponse; import org.elasticsearch.action.index.IndexRequestBuilder; @@ -30,11 +30,9 @@ import java.util.ArrayList; import java.util.List; -import java.util.concurrent.CountDownLatch; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -101,29 +99,35 @@ protected void migrateIndexTestCase(int docCount, Script script) throws Interrup * to be able to consistently use this API on startup in all nodes.*/ MigrateIndexRequestBuilder migrate = client().admin().indices().prepareMigrateIndex("test_2", "test_3") .setAliases("test").setScript(script); - int latchSize = 10; - CountDownLatch latch = new CountDownLatch(latchSize); - ExecutorService executor = Executors.newFixedThreadPool(between(2, Runtime.getRuntime().availableProcessors())); - try { - int totalRequests = between(1, 100) * latchSize; - List>> tasks = new ArrayList<>(totalRequests); - for (int i = 0; i < totalRequests; i++) { - tasks.add(executor.submit(() -> { + int concurrentRequests = between(2, Runtime.getRuntime().availableProcessors()); + CyclicBarrier latch = new CyclicBarrier(concurrentRequests); + int requestsPerThread = between(5, 100); + List threads = new ArrayList<>(); + for (int i = 0; i < concurrentRequests; i++) { + Thread t = new Thread(() ->{ + for (int r = 0; r < requestsPerThread; r++) { try { latch.await(); - } catch (InterruptedException e) { + } catch (InterruptedException | BrokenBarrierException e) { throw new RuntimeException(e); } - return migrate.execute(); - })); - } - for (Future> task : tasks) { - MigrateIndexResponse response = task.get(20, TimeUnit.SECONDS).get(); - assertTrue(response.isAcknowledged()); + MigrateIndexResponse response = migrate.get(); + assertTrue(response.isAcknowledged()); + assertFalse(client().admin().indices().prepareExists("test_2").get().isExists()); + assertTrue(client().admin().indices().prepareExists("test_3").get().isExists()); + } + }); + t.setName(getTestName() + "#" + i); + threads.add(t); + t.start(); + } + try { + for (Thread thread : threads) { + thread.join(TimeUnit.SECONDS.toMillis(30)); } - } finally { - executor.shutdown(); + } catch (InterruptedException e) { + ListTasksResponse listTasks = client().admin().cluster().prepareListTasks().get(); + throw new RuntimeException("Failed while waiting for migrations. These are the running tasks: " + listTasks, e); } - } }