Skip to content

Commit

Permalink
Create async search index if necessary on updates and deletes (#64606)
Browse files Browse the repository at this point in the history
This change ensures that we create the async search index with the right mappings and settings when updating or deleting a document. Users can delete the async search index at any time so we have to re-create it internally if necessary before applying any new operation.
  • Loading branch information
jimczi committed Dec 2, 2020
1 parent bd159d8 commit 1c34507
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,16 @@ public final class AsyncTaskIndexService<R extends AsyncResponse<R>> {
public static final String EXPIRATION_TIME_FIELD = "expiration_time";
public static final String RESULT_FIELD = "result";

private static Settings settings() {
static Settings settings() {
return Settings.builder()
.put("index.codec", "best_compression")
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetadata.SETTING_AUTO_EXPAND_REPLICAS, "0-1")
.build();
}

private static XContentBuilder mappings() throws IOException {
static XContentBuilder mappings() throws IOException {
XContentBuilder builder = jsonBuilder()
.startObject()
.startObject(SINGLE_MAPPING_NAME)
Expand Down Expand Up @@ -195,7 +197,9 @@ public void updateResponse(String docId,
.id(docId)
.doc(source, XContentType.JSON)
.retryOnConflict(5);
client.update(request, listener);
// updates create the index automatically if it doesn't exist so we force the creation
// preemptively.
createIndexIfNecessary(ActionListener.wrap(v -> client.update(request, listener), listener::onFailure));
} catch(Exception e) {
listener.onFailure(e);
}
Expand All @@ -213,7 +217,9 @@ public void updateExpirationTime(String docId,
.id(docId)
.doc(source, XContentType.JSON)
.retryOnConflict(5);
client.update(request, listener);
// updates create the index automatically if it doesn't exist so we force the creation
// preemptively.
createIndexIfNecessary(ActionListener.wrap(v -> client.update(request, listener), listener::onFailure));
}

/**
Expand All @@ -223,7 +229,9 @@ public void deleteResponse(AsyncExecutionId asyncExecutionId,
ActionListener<DeleteResponse> listener) {
try {
DeleteRequest request = new DeleteRequest(index).id(asyncExecutionId.getDocId());
client.delete(request, listener);
// deletes create the index automatically if it doesn't exist so we force the creation
// preemptively.
createIndexIfNecessary(ActionListener.wrap(v -> client.delete(request, listener), listener::onFailure));
} catch(Exception e) {
listener.onFailure(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,15 @@

import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse;
Expand All @@ -21,7 +25,6 @@

import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.ExecutionException;

// TODO: test CRUD operations
public class AsyncTaskServiceTests extends ESSingleNodeTestCase {
Expand Down Expand Up @@ -95,14 +98,55 @@ public void testEnsuredAuthenticatedUserIsSame() throws IOException {
assertFalse(indexService.ensureAuthenticatedUserIsSame(threadContext.getHeaders(), runAsDiffType));
}

public void testSettings() throws ExecutionException, InterruptedException {
PlainActionFuture<Void> future = PlainActionFuture.newFuture();
indexService.createIndexIfNecessary(future);
future.get();
public void testAutoCreateIndex() throws Exception {
{
PlainActionFuture<Void> future = PlainActionFuture.newFuture();
indexService.createIndexIfNecessary(future);
future.get();
assertSettings();
}
AcknowledgedResponse ack = client().admin().indices().prepareDelete(index).get();
assertTrue(ack.isAcknowledged());

AsyncExecutionId id = new AsyncExecutionId("0", new TaskId("N/A", 0));
AsyncSearchResponse resp = new AsyncSearchResponse(id.getEncoded(), true, true, 0L, 0L);
{
PlainActionFuture<IndexResponse> future = PlainActionFuture.newFuture();
indexService.createResponse(id.getDocId(), Collections.emptyMap(), resp, future);
future.get();
assertSettings();
}
ack = client().admin().indices().prepareDelete(index).get();
assertTrue(ack.isAcknowledged());
{
PlainActionFuture<DeleteResponse> future = PlainActionFuture.newFuture();
indexService.deleteResponse(id, future);
future.get();
assertSettings();
}
ack = client().admin().indices().prepareDelete(index).get();
assertTrue(ack.isAcknowledged());
{
PlainActionFuture<UpdateResponse> future = PlainActionFuture.newFuture();
indexService.updateResponse(id.getDocId(), Collections.emptyMap(), resp, future);
expectThrows(Exception.class, () -> future.get());
assertSettings();
}
ack = client().admin().indices().prepareDelete(index).get();
assertTrue(ack.isAcknowledged());
{
PlainActionFuture<UpdateResponse> future = PlainActionFuture.newFuture();
indexService.updateExpirationTime("0", 10L, future);
expectThrows(Exception.class, () -> future.get());
assertSettings();
}
}

private void assertSettings() throws IOException {
GetIndexResponse getIndexResponse = client().admin().indices().getIndex(
new GetIndexRequest().indices(index)).actionGet();
Settings settings = getIndexResponse.getSettings().get(index);
assertEquals("1", settings.get(IndexMetadata.SETTING_NUMBER_OF_SHARDS));
assertEquals("0-1", settings.get(IndexMetadata.SETTING_AUTO_EXPAND_REPLICAS));
Settings expected = AsyncTaskIndexService.settings();
assertEquals(expected, settings.filter(key -> expected.hasValue(key)));
}
}

0 comments on commit 1c34507

Please sign in to comment.