diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java index 0b3b1ffaea9ad..314c0a325f28e 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java @@ -67,14 +67,16 @@ public final class AsyncTaskIndexService> { public static final String EXPIRATION_TIME_FIELD = "expiration_time"; public static final String RESULT_FIELD = "result"; - private static Settings settings() { + static Settings settings() { return Settings.builder() + .put("index.codec", "best_compression") .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) .put(IndexMetadata.SETTING_AUTO_EXPAND_REPLICAS, "0-1") .build(); } - private static XContentBuilder mappings() throws IOException { + static XContentBuilder mappings() throws IOException { XContentBuilder builder = jsonBuilder() .startObject() .startObject(SINGLE_MAPPING_NAME) @@ -197,7 +199,9 @@ public void updateResponse(String docId, .id(docId) .doc(source, XContentType.JSON) .retryOnConflict(5); - client.update(request, listener); + // updates create the index automatically if it doesn't exist so we force the creation + // preemptively. + createIndexIfNecessary(ActionListener.wrap(v -> client.update(request, listener), listener::onFailure)); } catch(Exception e) { listener.onFailure(e); } @@ -215,7 +219,9 @@ public void updateExpirationTime(String docId, .id(docId) .doc(source, XContentType.JSON) .retryOnConflict(5); - client.update(request, listener); + // updates create the index automatically if it doesn't exist so we force the creation + // preemptively. + createIndexIfNecessary(ActionListener.wrap(v -> client.update(request, listener), listener::onFailure)); } /** @@ -225,7 +231,9 @@ public void deleteResponse(AsyncExecutionId asyncExecutionId, ActionListener listener) { try { DeleteRequest request = new DeleteRequest(index).id(asyncExecutionId.getDocId()); - client.delete(request, listener); + // deletes create the index automatically if it doesn't exist so we force the creation + // preemptively. + createIndexIfNecessary(ActionListener.wrap(v -> client.delete(request, listener), listener::onFailure)); } catch(Exception e) { listener.onFailure(e); } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncTaskServiceTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncTaskServiceTests.java index 28e9a9c806a2e..99b538da7dd99 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncTaskServiceTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncTaskServiceTests.java @@ -7,11 +7,15 @@ import org.elasticsearch.action.admin.indices.get.GetIndexRequest; import org.elasticsearch.action.admin.indices.get.GetIndexResponse; +import org.elasticsearch.action.delete.DeleteResponse; +import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.support.PlainActionFuture; -import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.test.ESSingleNodeTestCase; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse; @@ -21,7 +25,6 @@ import java.io.IOException; import java.util.Collections; -import java.util.concurrent.ExecutionException; // TODO: test CRUD operations public class AsyncTaskServiceTests extends ESSingleNodeTestCase { @@ -95,14 +98,55 @@ public void testEnsuredAuthenticatedUserIsSame() throws IOException { assertFalse(indexService.ensureAuthenticatedUserIsSame(threadContext.getHeaders(), runAsDiffType)); } - public void testSettings() throws ExecutionException, InterruptedException { - PlainActionFuture future = PlainActionFuture.newFuture(); - indexService.createIndexIfNecessary(future); - future.get(); + public void testAutoCreateIndex() throws Exception { + { + PlainActionFuture future = PlainActionFuture.newFuture(); + indexService.createIndexIfNecessary(future); + future.get(); + assertSettings(); + } + AcknowledgedResponse ack = client().admin().indices().prepareDelete(index).get(); + assertTrue(ack.isAcknowledged()); + + AsyncExecutionId id = new AsyncExecutionId("0", new TaskId("N/A", 0)); + AsyncSearchResponse resp = new AsyncSearchResponse(id.getEncoded(), true, true, 0L, 0L); + { + PlainActionFuture future = PlainActionFuture.newFuture(); + indexService.createResponse(id.getDocId(), Collections.emptyMap(), resp, future); + future.get(); + assertSettings(); + } + ack = client().admin().indices().prepareDelete(index).get(); + assertTrue(ack.isAcknowledged()); + { + PlainActionFuture future = PlainActionFuture.newFuture(); + indexService.deleteResponse(id, future); + future.get(); + assertSettings(); + } + ack = client().admin().indices().prepareDelete(index).get(); + assertTrue(ack.isAcknowledged()); + { + PlainActionFuture future = PlainActionFuture.newFuture(); + indexService.updateResponse(id.getDocId(), Collections.emptyMap(), resp, future); + expectThrows(Exception.class, () -> future.get()); + assertSettings(); + } + ack = client().admin().indices().prepareDelete(index).get(); + assertTrue(ack.isAcknowledged()); + { + PlainActionFuture future = PlainActionFuture.newFuture(); + indexService.updateExpirationTime("0", 10L, future); + expectThrows(Exception.class, () -> future.get()); + assertSettings(); + } + } + + private void assertSettings() throws IOException { GetIndexResponse getIndexResponse = client().admin().indices().getIndex( new GetIndexRequest().indices(index)).actionGet(); Settings settings = getIndexResponse.getSettings().get(index); - assertEquals("1", settings.get(IndexMetadata.SETTING_NUMBER_OF_SHARDS)); - assertEquals("0-1", settings.get(IndexMetadata.SETTING_AUTO_EXPAND_REPLICAS)); + Settings expected = AsyncTaskIndexService.settings(); + assertEquals(expected, settings.filter(key -> expected.hasValue(key))); } }