Skip to content

Commit

Permalink
Add assertBusy helper test method
Browse files Browse the repository at this point in the history
We use awaitBusy in our tests, the problem is that we have to check if it awaited or not, and then try and keep around somehow more info around why the predicate failed and a timeout happened.
The idea of assertBusy is to allow to simply write "regular" test code, and if the test code trips, it will busy wait till a timeout. This allows us to keep around the assertion information and properly throw it for information that is inherently kept in the failure itself.
  • Loading branch information
kimchy committed Jul 8, 2014
1 parent 7335b5d commit 10030a6
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 105 deletions.
147 changes: 42 additions & 105 deletions src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.PendingClusterTask;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings;
Expand Down Expand Up @@ -91,6 +92,7 @@
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.test.client.RandomizingClient;
import org.hamcrest.Matchers;
import org.junit.*;

import java.io.IOException;
Expand All @@ -99,10 +101,7 @@
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;

import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
Expand All @@ -112,6 +111,7 @@
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.*;
import static org.hamcrest.Matchers.emptyIterable;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;

/**
* {@link ElasticsearchIntegrationTest} is an abstract base class to run integration
Expand Down Expand Up @@ -715,152 +715,89 @@ private ImmutableSettings.Builder getExcludeSettings(String index, int num, Immu
/**
* Waits until all nodes have no pending tasks.
*/
public void waitNoPendingTasksOnAll() throws InterruptedException {
client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).get();
final PendingClusterTasksResponse[] reference = new PendingClusterTasksResponse[1];
boolean applied = awaitBusy(new Predicate<Object>() {
public void waitNoPendingTasksOnAll() throws Exception {
assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).get());
assertBusy(new Runnable() {
@Override
public boolean apply(Object input) {
reference[0] = null;
public void run() {
for (Client client : clients()) {
PendingClusterTasksResponse pendingTasks = client.admin().cluster().preparePendingClusterTasks().setLocal(true).get();
if (!pendingTasks.pendingTasks().isEmpty()) {
reference[0] = pendingTasks;
return false;
}
assertThat("client " + client + " still has pending tasks " + pendingTasks.prettyPrint(), pendingTasks, Matchers.emptyIterable());
}
return true;
}
});
if (!applied) {
fail(reference[0].prettyPrint());
}
client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).get();
assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).get());
}

/**
* Waits until the elected master node has no pending tasks.
*/
public void waitNoPendingTasksOnMaster() throws InterruptedException {
client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).get();
final PendingClusterTasksResponse[] reference = new PendingClusterTasksResponse[1];
boolean applied = awaitBusy(new Predicate<Object>() {
public void waitNoPendingTasksOnMaster() throws Exception {
assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).get());
assertBusy(new Runnable() {
@Override
public boolean apply(Object input) {
reference[0] = null;
PendingClusterTasksResponse pendingTasks = client().admin().cluster().preparePendingClusterTasks().get();
if (!pendingTasks.pendingTasks().isEmpty()) {
reference[0] = pendingTasks;
return false;
}
return true;
public void run() {
PendingClusterTasksResponse pendingTasks = client().admin().cluster().preparePendingClusterTasks().setLocal(true).get();
assertThat("master still has pending tasks " + pendingTasks.prettyPrint(), pendingTasks, Matchers.emptyIterable());
}
});
if (!applied) {
fail(reference[0].prettyPrint());
}
client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).get();
assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).get());
}

/**
* Waits till a (pattern) field name mappings concretely exists on all nodes. Note, this waits for the current
* started shards and checks for concrete mappings.
*/
public void waitForConcreteMappingsOnAll(final String index, final String type, final String... fieldNames) throws InterruptedException {
boolean applied = awaitBusy(new Predicate<Object>() {
public void waitForConcreteMappingsOnAll(final String index, final String type, final String... fieldNames) throws Exception {
assertBusy(new Runnable() {
@Override
public boolean apply(Object input) {
try {
Set<String> nodes = internalCluster().nodesInclude(index);
if (nodes.isEmpty()) { // we expect at least one node to hold an index, so wait if not allocated yet
return false;
}
for (String node : nodes) {
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node);
IndexService indexService = indicesService.indexService(index);
if (indexService == null) {
return false;
}
DocumentMapper documentMapper = indexService.mapperService().documentMapper(type);
if (documentMapper == null) {
return false;
}
for (String fieldName : fieldNames) {
Set<String> matches = documentMapper.mappers().simpleMatchToFullName(fieldName);
if (matches.isEmpty()) {
return false;
}
}
public void run() {
Set<String> nodes = internalCluster().nodesInclude(index);
assertThat(nodes, Matchers.not(Matchers.emptyIterable()));
for (String node : nodes) {
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node);
IndexService indexService = indicesService.indexService(index);
assertThat("index service doesn't exists on " + node, indexService, notNullValue());
DocumentMapper documentMapper = indexService.mapperService().documentMapper(type);
assertThat("document mapper doesn't exists on " + node, documentMapper, notNullValue());
for (String fieldName : fieldNames) {
Set<String> matches = documentMapper.mappers().simpleMatchToFullName(fieldName);
assertThat("field " + fieldName + " doesn't exists on " + node, matches, Matchers.not(emptyIterable()));
}
} catch (Exception e) {
logger.info("got exception waiting for concrete mappings", e);
return false;
}
return true;
}
});
if (!applied) {
fail("failed to find mappings on all nodes for index " + index + ", type " + type + ", and fieldName " + Arrays.toString(fieldNames));
}
waitForMappingOnMaster(index, type, fieldNames);
}

/**
* Waits for the given mapping type to exists on the master node.
*/
public void waitForMappingOnMaster(final String index, final String type, final String... fieldNames) throws InterruptedException {
final GetMappingsResponse[] lastResponse = new GetMappingsResponse[1];
boolean applied = awaitBusy(new Predicate<Object>() {
public void waitForMappingOnMaster(final String index, final String type, final String... fieldNames) throws Exception {
assertBusy(new Callable() {
@Override
public boolean apply(Object input) {
GetMappingsResponse response = lastResponse[0] = client().admin().indices().prepareGetMappings(index).setTypes(type).get();
public Object call() throws Exception {
GetMappingsResponse response = client().admin().indices().prepareGetMappings(index).setTypes(type).get();
ImmutableOpenMap<String, MappingMetaData> mappings = response.getMappings().get(index);
if (mappings == null) {
return false;
}
assertThat(mappings, notNullValue());
MappingMetaData mappingMetaData = mappings.get(type);
if (mappingMetaData == null) {
return false;
}
assertThat(mappingMetaData, notNullValue());

Map<String, Object> mappingSource;
try {
mappingSource = mappingMetaData.getSourceAsMap();
} catch (IOException e) {
throw ExceptionsHelper.convertToElastic(e);
}
if (mappingSource.isEmpty() && !mappingSource.containsKey("properties")) {
return false;
}
Map<String, Object> mappingSource = mappingMetaData.getSourceAsMap();
assertFalse(mappingSource.isEmpty());
assertTrue(mappingSource.containsKey("properties"));

for (String fieldName : fieldNames) {
Map<String, Object> mappingProperties = (Map<String, Object>) mappingSource.get("properties");
if (fieldName.indexOf('.') != -1) {
fieldName = fieldName.replace(".", ".properties.");
}
if (XContentMapValues.extractValue(fieldName, mappingProperties) == null) {
return false;
}
assertThat("field " + fieldName + " doesn't exists in mapping " + mappingMetaData.source().string(), XContentMapValues.extractValue(fieldName, mappingProperties), notNullValue());
}

return true;
return null;
}
});
if (!applied) {
String source = null;
ImmutableOpenMap<String, MappingMetaData> mappings = lastResponse[0].getMappings().get(index);
if (mappings != null) {
MappingMetaData mappingMetaData = mappings.get(type);
if (mappingMetaData != null) {
try {
source = mappingMetaData.source().string();
} catch (IOException e) {
throw ExceptionsHelper.convertToElastic(e);
}
}
}
fail("failed to find mappings for index " + index + ", type " + type + ", fields " + fieldNames + ", on master node, mapping source [" + source + "]");
}
}

/**
Expand Down
52 changes: 52 additions & 0 deletions src/test/java/org/elasticsearch/test/ElasticsearchTestCase.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@
import java.lang.reflect.Modifier;
import java.net.URI;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAllFilesClosed;
Expand Down Expand Up @@ -97,6 +99,56 @@ public abstract class ElasticsearchTestCase extends AbstractRandomizedTest {

}

/**
* Runs the code block for 10 seconds waiting for no assertion to trip.
*/
public static void assertBusy(Runnable codeBlock) throws Exception {
assertBusy(Executors.callable(codeBlock), 10, TimeUnit.SECONDS);
}

public static void assertBusy(Runnable codeBlock, long maxWaitTime, TimeUnit unit) throws Exception {
assertBusy(Executors.callable(codeBlock), maxWaitTime, unit);
}

/**
* Runs the code block for 10 seconds waiting for no assertion to trip.
*/
public static <V> V assertBusy(Callable<V> codeBlock) throws Exception {
return assertBusy(codeBlock, 10, TimeUnit.SECONDS);
}

/**
* Runs the code block for the provided interval, waiting for no assertions to trip.
*/
public static <V> V assertBusy(Callable<V> codeBlock, long maxWaitTime, TimeUnit unit) throws Exception {
long maxTimeInMillis = TimeUnit.MILLISECONDS.convert(maxWaitTime, unit);
long iterations = Math.max(Math.round(Math.log10(maxTimeInMillis) / Math.log10(2)), 1);
long timeInMillis = 1;
long sum = 0;
List<AssertionError> failures = new ArrayList<>();
for (int i = 0; i < iterations; i++) {
try {
return codeBlock.call();
} catch (AssertionError e) {
failures.add(e);
}
sum += timeInMillis;
Thread.sleep(timeInMillis);
timeInMillis *= 2;
}
timeInMillis = maxTimeInMillis - sum;
Thread.sleep(Math.max(timeInMillis, 0));
try {
return codeBlock.call();
} catch (AssertionError e) {
for (AssertionError failure : failures) {
e.addSuppressed(failure);
}
throw e;
}
}


public static boolean awaitBusy(Predicate<?> breakPredicate) throws InterruptedException {
return awaitBusy(breakPredicate, 10, TimeUnit.SECONDS);
}
Expand Down

0 comments on commit 10030a6

Please sign in to comment.