From 10030a63cc5a10bf8a8726fc670f240b1ce98623 Mon Sep 17 00:00:00 2001 From: Shay Banon Date: Sun, 6 Jul 2014 12:34:41 +0200 Subject: [PATCH] Add assertBusy helper test method We use awaitBusy in our tests, the problem is that we have to check if it awaited or not, and then try and keep around somehow more info around why the predicate failed and a timeout happened. The idea of assertBusy is to allow to simply write "regular" test code, and if the test code trips, it will busy wait till a timeout. This allows us to keep around the assertion information and properly throw it for information that is inherently kept in the failure itself. --- .../test/ElasticsearchIntegrationTest.java | 147 +++++------------- .../test/ElasticsearchTestCase.java | 52 +++++++ 2 files changed, 94 insertions(+), 105 deletions(-) diff --git a/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java b/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java index ea158860aaba3..d0281ebe13da4 100644 --- a/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java +++ b/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java @@ -56,6 +56,7 @@ import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.service.PendingClusterTask; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Priority; import org.elasticsearch.common.Strings; @@ -91,6 +92,7 @@ import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchService; import org.elasticsearch.test.client.RandomizingClient; +import org.hamcrest.Matchers; import org.junit.*; import java.io.IOException; @@ -99,10 +101,7 @@ import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; import java.util.*; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; @@ -112,6 +111,7 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.*; import static org.hamcrest.Matchers.emptyIterable; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; /** * {@link ElasticsearchIntegrationTest} is an abstract base class to run integration @@ -715,152 +715,89 @@ private ImmutableSettings.Builder getExcludeSettings(String index, int num, Immu /** * Waits until all nodes have no pending tasks. */ - public void waitNoPendingTasksOnAll() throws InterruptedException { - client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).get(); - final PendingClusterTasksResponse[] reference = new PendingClusterTasksResponse[1]; - boolean applied = awaitBusy(new Predicate() { + public void waitNoPendingTasksOnAll() throws Exception { + assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).get()); + assertBusy(new Runnable() { @Override - public boolean apply(Object input) { - reference[0] = null; + public void run() { for (Client client : clients()) { PendingClusterTasksResponse pendingTasks = client.admin().cluster().preparePendingClusterTasks().setLocal(true).get(); - if (!pendingTasks.pendingTasks().isEmpty()) { - reference[0] = pendingTasks; - return false; - } + assertThat("client " + client + " still has pending tasks " + pendingTasks.prettyPrint(), pendingTasks, Matchers.emptyIterable()); } - return true; } }); - if (!applied) { - fail(reference[0].prettyPrint()); - } - client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).get(); + assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).get()); } /** * Waits until the elected master node has no pending tasks. */ - public void waitNoPendingTasksOnMaster() throws InterruptedException { - client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).get(); - final PendingClusterTasksResponse[] reference = new PendingClusterTasksResponse[1]; - boolean applied = awaitBusy(new Predicate() { + public void waitNoPendingTasksOnMaster() throws Exception { + assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).get()); + assertBusy(new Runnable() { @Override - public boolean apply(Object input) { - reference[0] = null; - PendingClusterTasksResponse pendingTasks = client().admin().cluster().preparePendingClusterTasks().get(); - if (!pendingTasks.pendingTasks().isEmpty()) { - reference[0] = pendingTasks; - return false; - } - return true; + public void run() { + PendingClusterTasksResponse pendingTasks = client().admin().cluster().preparePendingClusterTasks().setLocal(true).get(); + assertThat("master still has pending tasks " + pendingTasks.prettyPrint(), pendingTasks, Matchers.emptyIterable()); } }); - if (!applied) { - fail(reference[0].prettyPrint()); - } - client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).get(); + assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).get()); } /** * Waits till a (pattern) field name mappings concretely exists on all nodes. Note, this waits for the current * started shards and checks for concrete mappings. */ - public void waitForConcreteMappingsOnAll(final String index, final String type, final String... fieldNames) throws InterruptedException { - boolean applied = awaitBusy(new Predicate() { + public void waitForConcreteMappingsOnAll(final String index, final String type, final String... fieldNames) throws Exception { + assertBusy(new Runnable() { @Override - public boolean apply(Object input) { - try { - Set nodes = internalCluster().nodesInclude(index); - if (nodes.isEmpty()) { // we expect at least one node to hold an index, so wait if not allocated yet - return false; - } - for (String node : nodes) { - IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node); - IndexService indexService = indicesService.indexService(index); - if (indexService == null) { - return false; - } - DocumentMapper documentMapper = indexService.mapperService().documentMapper(type); - if (documentMapper == null) { - return false; - } - for (String fieldName : fieldNames) { - Set matches = documentMapper.mappers().simpleMatchToFullName(fieldName); - if (matches.isEmpty()) { - return false; - } - } + public void run() { + Set nodes = internalCluster().nodesInclude(index); + assertThat(nodes, Matchers.not(Matchers.emptyIterable())); + for (String node : nodes) { + IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node); + IndexService indexService = indicesService.indexService(index); + assertThat("index service doesn't exists on " + node, indexService, notNullValue()); + DocumentMapper documentMapper = indexService.mapperService().documentMapper(type); + assertThat("document mapper doesn't exists on " + node, documentMapper, notNullValue()); + for (String fieldName : fieldNames) { + Set matches = documentMapper.mappers().simpleMatchToFullName(fieldName); + assertThat("field " + fieldName + " doesn't exists on " + node, matches, Matchers.not(emptyIterable())); } - } catch (Exception e) { - logger.info("got exception waiting for concrete mappings", e); - return false; } - return true; } }); - if (!applied) { - fail("failed to find mappings on all nodes for index " + index + ", type " + type + ", and fieldName " + Arrays.toString(fieldNames)); - } waitForMappingOnMaster(index, type, fieldNames); } /** * Waits for the given mapping type to exists on the master node. */ - public void waitForMappingOnMaster(final String index, final String type, final String... fieldNames) throws InterruptedException { - final GetMappingsResponse[] lastResponse = new GetMappingsResponse[1]; - boolean applied = awaitBusy(new Predicate() { + public void waitForMappingOnMaster(final String index, final String type, final String... fieldNames) throws Exception { + assertBusy(new Callable() { @Override - public boolean apply(Object input) { - GetMappingsResponse response = lastResponse[0] = client().admin().indices().prepareGetMappings(index).setTypes(type).get(); + public Object call() throws Exception { + GetMappingsResponse response = client().admin().indices().prepareGetMappings(index).setTypes(type).get(); ImmutableOpenMap mappings = response.getMappings().get(index); - if (mappings == null) { - return false; - } + assertThat(mappings, notNullValue()); MappingMetaData mappingMetaData = mappings.get(type); - if (mappingMetaData == null) { - return false; - } + assertThat(mappingMetaData, notNullValue()); - Map mappingSource; - try { - mappingSource = mappingMetaData.getSourceAsMap(); - } catch (IOException e) { - throw ExceptionsHelper.convertToElastic(e); - } - if (mappingSource.isEmpty() && !mappingSource.containsKey("properties")) { - return false; - } + Map mappingSource = mappingMetaData.getSourceAsMap(); + assertFalse(mappingSource.isEmpty()); + assertTrue(mappingSource.containsKey("properties")); for (String fieldName : fieldNames) { Map mappingProperties = (Map) mappingSource.get("properties"); if (fieldName.indexOf('.') != -1) { fieldName = fieldName.replace(".", ".properties."); } - if (XContentMapValues.extractValue(fieldName, mappingProperties) == null) { - return false; - } + assertThat("field " + fieldName + " doesn't exists in mapping " + mappingMetaData.source().string(), XContentMapValues.extractValue(fieldName, mappingProperties), notNullValue()); } - return true; + return null; } }); - if (!applied) { - String source = null; - ImmutableOpenMap mappings = lastResponse[0].getMappings().get(index); - if (mappings != null) { - MappingMetaData mappingMetaData = mappings.get(type); - if (mappingMetaData != null) { - try { - source = mappingMetaData.source().string(); - } catch (IOException e) { - throw ExceptionsHelper.convertToElastic(e); - } - } - } - fail("failed to find mappings for index " + index + ", type " + type + ", fields " + fieldNames + ", on master node, mapping source [" + source + "]"); - } } /** diff --git a/src/test/java/org/elasticsearch/test/ElasticsearchTestCase.java b/src/test/java/org/elasticsearch/test/ElasticsearchTestCase.java index 545ca6925a770..97e6c9b8bd81f 100644 --- a/src/test/java/org/elasticsearch/test/ElasticsearchTestCase.java +++ b/src/test/java/org/elasticsearch/test/ElasticsearchTestCase.java @@ -55,6 +55,8 @@ import java.lang.reflect.Modifier; import java.net.URI; import java.util.*; +import java.util.concurrent.Callable; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAllFilesClosed; @@ -97,6 +99,56 @@ public abstract class ElasticsearchTestCase extends AbstractRandomizedTest { } + /** + * Runs the code block for 10 seconds waiting for no assertion to trip. + */ + public static void assertBusy(Runnable codeBlock) throws Exception { + assertBusy(Executors.callable(codeBlock), 10, TimeUnit.SECONDS); + } + + public static void assertBusy(Runnable codeBlock, long maxWaitTime, TimeUnit unit) throws Exception { + assertBusy(Executors.callable(codeBlock), maxWaitTime, unit); + } + + /** + * Runs the code block for 10 seconds waiting for no assertion to trip. + */ + public static V assertBusy(Callable codeBlock) throws Exception { + return assertBusy(codeBlock, 10, TimeUnit.SECONDS); + } + + /** + * Runs the code block for the provided interval, waiting for no assertions to trip. + */ + public static V assertBusy(Callable codeBlock, long maxWaitTime, TimeUnit unit) throws Exception { + long maxTimeInMillis = TimeUnit.MILLISECONDS.convert(maxWaitTime, unit); + long iterations = Math.max(Math.round(Math.log10(maxTimeInMillis) / Math.log10(2)), 1); + long timeInMillis = 1; + long sum = 0; + List failures = new ArrayList<>(); + for (int i = 0; i < iterations; i++) { + try { + return codeBlock.call(); + } catch (AssertionError e) { + failures.add(e); + } + sum += timeInMillis; + Thread.sleep(timeInMillis); + timeInMillis *= 2; + } + timeInMillis = maxTimeInMillis - sum; + Thread.sleep(Math.max(timeInMillis, 0)); + try { + return codeBlock.call(); + } catch (AssertionError e) { + for (AssertionError failure : failures) { + e.addSuppressed(failure); + } + throw e; + } + } + + public static boolean awaitBusy(Predicate breakPredicate) throws InterruptedException { return awaitBusy(breakPredicate, 10, TimeUnit.SECONDS); }