Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create async search index if necessary on updates and deletes #64606

Merged
merged 12 commits into from
Dec 2, 2020
Original file line number Diff line number Diff line change
Expand Up @@ -1302,7 +1302,7 @@ protected static boolean isXPackTemplate(String name) {
case ".logstash-management":
case "security_audit_log":
case ".slm-history":
case ".async-search":
case "async-search":
case "saml-service-provider":
case "logs":
case "logs-settings":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.xpack.core.XPackPlugin;
import org.elasticsearch.xpack.core.async.AsyncTaskIndexService;
import org.elasticsearch.xpack.core.async.AsyncTaskMaintenanceService;
import org.elasticsearch.xpack.core.async.AsyncTaskTemplateRegistry;
import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse;

import java.util.ArrayList;
Expand Down Expand Up @@ -62,6 +63,15 @@ public Collection<Object> createComponents(
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<RepositoriesService> repositoriesServiceSupplier
) {
AsyncTaskTemplateRegistry indexTemplateRegistry = new AsyncTaskTemplateRegistry(
settings,
clusterService,
threadPool,
client,
xContentRegistry
);
indexTemplateRegistry.initialize();

List<Object> components = new ArrayList<>();
if (DiscoveryNode.isDataNode(environment.settings())) {
// only data nodes should be eligible to run the maintenance service.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
Expand All @@ -21,7 +19,6 @@
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
Expand All @@ -31,9 +28,7 @@
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskManager;
Expand All @@ -52,8 +47,6 @@
import java.util.Map;
import java.util.function.BiFunction;

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.mapper.MapperService.SINGLE_MAPPING_NAME;
import static org.elasticsearch.xpack.core.security.authc.AuthenticationField.AUTHENTICATION_KEY;

/**
Expand All @@ -67,45 +60,7 @@ public final class AsyncTaskIndexService<R extends AsyncResponse<R>> {
public static final String EXPIRATION_TIME_FIELD = "expiration_time";
public static final String RESULT_FIELD = "result";

private static Settings settings() {
return Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_AUTO_EXPAND_REPLICAS, "0-1")
.build();
}

private static XContentBuilder mappings() throws IOException {
XContentBuilder builder = jsonBuilder()
.startObject()
.startObject(SINGLE_MAPPING_NAME)
.startObject("_meta")
.field("version", Version.CURRENT)
.endObject()
.field("dynamic", "strict")
.startObject("properties")
.startObject(HEADERS_FIELD)
.field("type", "object")
.field("enabled", "false")
.endObject()
.startObject(RESPONSE_HEADERS_FIELD)
.field("type", "object")
.field("enabled", "false")
.endObject()
.startObject(RESULT_FIELD)
.field("type", "object")
.field("enabled", "false")
.endObject()
.startObject(EXPIRATION_TIME_FIELD)
.field("type", "long")
.endObject()
.endObject()
.endObject()
.endObject();
return builder;
}

private final String index;
private final ClusterService clusterService;
private final Client client;
private final SecurityContext securityContext;
private final NamedWriteableRegistry registry;
Expand All @@ -120,7 +75,6 @@ public AsyncTaskIndexService(String index,
Writeable.Reader<R> reader,
NamedWriteableRegistry registry) {
this.index = index;
this.clusterService = clusterService;
this.securityContext = new SecurityContext(clusterService.getSettings(), threadContext);
this.client = new OriginSettingClient(client, origin);
this.registry = registry;
Expand All @@ -134,34 +88,6 @@ public Client getClient() {
return client;
}

/**
* Creates the index with the expected settings and mappings if it doesn't exist.
*/
void createIndexIfNecessary(ActionListener<Void> listener) {
if (clusterService.state().routingTable().hasIndex(index) == false) {
try {
client.admin().indices().prepareCreate(index)
.setSettings(settings())
.setMapping(mappings())
.execute(ActionListener.wrap(
resp -> listener.onResponse(null),
exc -> {
if (ExceptionsHelper.unwrapCause(exc) instanceof ResourceAlreadyExistsException) {
listener.onResponse(null);
} else {
logger.error("failed to create " + index + " index", exc);
listener.onFailure(exc);
}
}));
} catch (Exception exc) {
logger.error("failed to create " + index + " index", exc);
listener.onFailure(exc);
}
} else {
listener.onResponse(null);
}
}

/**
* Stores the initial response with the original headers of the authenticated user
* and the expected expiration time.
Expand All @@ -178,7 +104,7 @@ public void createResponse(String docId,
.create(true)
.id(docId)
.source(source, XContentType.JSON);
createIndexIfNecessary(ActionListener.wrap(v -> client.index(indexRequest, listener), listener::onFailure));
client.index(indexRequest, listener);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.core.async;

import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.template.IndexTemplateConfig;
import org.elasticsearch.xpack.core.template.IndexTemplateRegistry;

import java.util.Collections;
import java.util.List;

/**
* The {@link AsyncTaskTemplateRegistry} class sets up the index template for the .async-search index.
*/
public class AsyncTaskTemplateRegistry extends IndexTemplateRegistry {
public static final int INDEX_TEMPLATE_VERSION = 0;

public static final String ASYNC_TASK_TEMPLATE_VERSION_VARIABLE = "xpack.async_search.template.version";
public static final String ASYNC_SEARCH_TEMPLATE_NAME = "async-search";

public static final IndexTemplateConfig TEMPLATE_ASYNC_SEARCH = new IndexTemplateConfig(
ASYNC_SEARCH_TEMPLATE_NAME,
"/async-search.json",
INDEX_TEMPLATE_VERSION,
ASYNC_TASK_TEMPLATE_VERSION_VARIABLE
);

public AsyncTaskTemplateRegistry(Settings nodeSettings, ClusterService clusterService,
ThreadPool threadPool, Client client,
NamedXContentRegistry xContentRegistry) {
super(nodeSettings, clusterService, threadPool, client, xContentRegistry);
}

@Override
protected boolean requiresMasterNode() {
return true;
}

@Override
protected List<IndexTemplateConfig> getComposableTemplateConfigs() {
return Collections.singletonList(TEMPLATE_ASYNC_SEARCH);
}

@Override
protected String getOrigin() {
return ClientHelper.INDEX_LIFECYCLE_ORIGIN;
jimczi marked this conversation as resolved.
Show resolved Hide resolved
}

public boolean validate(ClusterState state) {
boolean allTemplatesPresent = getComposableTemplateConfigs().stream()
.map(IndexTemplateConfig::getTemplateName)
.allMatch(name -> state.metadata().templatesV2().containsKey(name));

return allTemplatesPresent;
}
}
38 changes: 38 additions & 0 deletions x-pack/plugin/core/src/main/resources/async-search.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
{
"index_patterns": [
".async-search"
],
"template": {
"settings": {
"index.codec": "best_compression",
"index.number_of_shards": 1,
"index.number_of_replicas": 0,
"index.auto_expand_replicas": "0-1"
},
"mappings": {
"dynamic": false,
"properties": {
"headers": {
"type": "object",
"enabled": false
},
"response_headers": {
"type": "object",
"enabled": false
},
"result": {
"type": "object",
"enabled": false
},
"expiration_time": {
"type": "long"
}
}
}
},
"_meta": {
"description": "index template for async search indices"
},
"priority": 2147483647,
"version": ${xpack.async_search.template.version}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,6 @@
*/
package org.elasticsearch.xpack.core.async;

import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
Expand All @@ -21,7 +17,6 @@

import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.ExecutionException;

// TODO: test CRUD operations
public class AsyncTaskServiceTests extends ESSingleNodeTestCase {
Expand Down Expand Up @@ -94,15 +89,4 @@ public void testEnsuredAuthenticatedUserIsSame() throws IOException {
assertFalse(indexService.ensureAuthenticatedUserIsSame(original, runAsDiffType));
assertFalse(indexService.ensureAuthenticatedUserIsSame(threadContext.getHeaders(), runAsDiffType));
}

public void testSettings() throws ExecutionException, InterruptedException {
PlainActionFuture<Void> future = PlainActionFuture.newFuture();
indexService.createIndexIfNecessary(future);
future.get();
GetIndexResponse getIndexResponse = client().admin().indices().getIndex(
new GetIndexRequest().indices(index)).actionGet();
Settings settings = getIndexResponse.getSettings().get(index);
assertEquals("1", settings.get(IndexMetadata.SETTING_NUMBER_OF_SHARDS));
assertEquals("0-1", settings.get(IndexMetadata.SETTING_AUTO_EXPAND_REPLICAS));
}
}
Loading