Skip to content

Commit

Permalink
Introduce an IndexSettingsProvider to inject logsdb index mode (ela…
Browse files Browse the repository at this point in the history
…stic#113505)

Here we introduce a new implementation of `IndexSettingProvider` whose goal is to "inject" the
`index.mode` setting with value `logsdb` when a cluster setting `cluster.logsdb.enabled` is `true`.
We also make sure that:
* the existing `index.mode` is not set
* the datastream name matches the `logs-*-*` pattern
* `logs@settings` component template is used
  • Loading branch information
salvatore-campagna committed Sep 26, 2024
1 parent cbce0a6 commit d0dabea
Show file tree
Hide file tree
Showing 13 changed files with 648 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import org.elasticsearch.client.RestClient;
import org.elasticsearch.index.IndexMode;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.test.cluster.ElasticsearchCluster;
import org.elasticsearch.test.cluster.local.distribution.DistributionType;
import org.hamcrest.Matchers;
Expand All @@ -23,6 +24,22 @@

public class LogsIndexModeDisabledRestTestIT extends LogsIndexModeRestTestIT {

private static final String MAPPINGS = """
{
"template": {
"mappings": {
"properties": {
"@timestamp": {
"type": "date"
},
"message": {
"type": "text"
}
}
}
}
}""";

@ClassRule()
public static ElasticsearchCluster cluster = ElasticsearchCluster.local()
.distribution(DistributionType.DEFAULT)
Expand Down Expand Up @@ -50,8 +67,59 @@ public void setup() throws Exception {

public void testLogsSettingsIndexModeDisabled() throws IOException {
assertOK(createDataStream(client, "logs-custom-dev"));
final String indexMode = (String) getSetting(client, getDataStreamBackingIndex(client, "logs-custom-dev", 0), "index.mode");
final String indexMode = (String) getSetting(
client,
getDataStreamBackingIndex(client, "logs-custom-dev", 0),
IndexSettings.MODE.getKey()
);
assertThat(indexMode, Matchers.not(equalTo(IndexMode.LOGSDB.getName())));
}

public void testTogglingLogsdb() throws IOException {
putComponentTemplate(client, "logs@settings", MAPPINGS);
assertOK(createDataStream(client, "logs-custom-dev"));
final String indexModeBefore = (String) getSetting(
client,
getDataStreamBackingIndex(client, "logs-custom-dev", 0),
IndexSettings.MODE.getKey()
);
assertThat(indexModeBefore, Matchers.not(equalTo(IndexMode.LOGSDB.getName())));
assertOK(putClusterSetting(client, "cluster.logsdb.enabled", "true"));
final String indexModeAfter = (String) getSetting(
client,
getDataStreamBackingIndex(client, "logs-custom-dev", 0),
IndexSettings.MODE.getKey()
);
assertThat(indexModeAfter, Matchers.not(equalTo(IndexMode.LOGSDB.getName())));
assertOK(rolloverDataStream(client, "logs-custom-dev"));
final String indexModeLater = (String) getSetting(
client,
getDataStreamBackingIndex(client, "logs-custom-dev", 1),
IndexSettings.MODE.getKey()
);
assertThat(indexModeLater, equalTo(IndexMode.LOGSDB.getName()));
assertOK(putClusterSetting(client, "cluster.logsdb.enabled", "false"));
assertOK(rolloverDataStream(client, "logs-custom-dev"));
final String indexModeFinal = (String) getSetting(
client,
getDataStreamBackingIndex(client, "logs-custom-dev", 2),
IndexSettings.MODE.getKey()
);
assertThat(indexModeFinal, Matchers.not(equalTo(IndexMode.LOGSDB.getName())));

}

public void testEnablingLogsdb() throws IOException {
putComponentTemplate(client, "logs@settings", MAPPINGS);
assertOK(putClusterSetting(client, "cluster.logsdb.enabled", true));
assertOK(createDataStream(client, "logs-custom-dev"));
final String indexMode = (String) getSetting(
client,
getDataStreamBackingIndex(client, "logs-custom-dev", 0),
IndexSettings.MODE.getKey()
);
assertThat(indexMode, equalTo(IndexMode.LOGSDB.getName()));
assertOK(putClusterSetting(client, "cluster.logsdb.enabled", false));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@
package org.elasticsearch.datastreams.logsdb;

import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.index.IndexMode;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.test.cluster.ElasticsearchCluster;
import org.elasticsearch.test.cluster.local.distribution.DistributionType;
import org.hamcrest.Matchers;
Expand Down Expand Up @@ -179,7 +181,11 @@ public void setup() throws Exception {
public void testCreateDataStream() throws IOException {
assertOK(putComponentTemplate(client, "logs@custom", MAPPINGS));
assertOK(createDataStream(client, "logs-custom-dev"));
final String indexMode = (String) getSetting(client, getDataStreamBackingIndex(client, "logs-custom-dev", 0), "index.mode");
final String indexMode = (String) getSetting(
client,
getDataStreamBackingIndex(client, "logs-custom-dev", 0),
IndexSettings.MODE.getKey()
);
assertThat(indexMode, equalTo(IndexMode.LOGSDB.getName()));
}

Expand Down Expand Up @@ -224,4 +230,83 @@ public void testRolloverDataStream() throws IOException {
assertThat(firstBackingIndex, Matchers.not(equalTo(secondBackingIndex)));
assertThat(getDataStreamBackingIndices(client, "logs-custom-dev").size(), equalTo(2));
}

public void testLogsAtSettingWithStandardOverride() throws IOException {
assertOK(putComponentTemplate(client, "logs@custom", """
{
"template": {
"settings": {
"index": {
"mode": "standard"
}
}
}
}
"""));
assertOK(createDataStream(client, "logs-custom-dev"));
final String indexMode = (String) getSetting(
client,
getDataStreamBackingIndex(client, "logs-custom-dev", 0),
IndexSettings.MODE.getKey()
);
assertThat(indexMode, equalTo(IndexMode.STANDARD.getName()));
}

public void testLogsAtSettingWithTimeSeriesOverride() throws IOException {
assertOK(putComponentTemplate(client, "logs@custom", """
{
"template": {
"settings": {
"index": {
"routing_path": [ "hostname" ],
"mode": "time_series",
"sort.field": [],
"sort.order": []
}
},
"mappings": {
"properties": {
"hostname": {
"type": "keyword",
"time_series_dimension": true
}
}
}
}
}
"""));
assertOK(createDataStream(client, "logs-custom-dev"));
final String indexMode = (String) getSetting(
client,
getDataStreamBackingIndex(client, "logs-custom-dev", 0),
IndexSettings.MODE.getKey()
);
assertThat(indexMode, equalTo(IndexMode.TIME_SERIES.getName()));
}

public void testLogsAtSettingWithTimeSeriesOverrideFailure() {
// NOTE: apm@settings defines sorting on @timestamp and template composition results in index.mode "time_series"
// with a non-allowed index.sort.field '@timestamp'. This fails at template composition stage before the index is even created.
final ResponseException ex = assertThrows(ResponseException.class, () -> putComponentTemplate(client, "logs@custom", """
{
"template": {
"settings": {
"index": {
"routing_path": [ "hostname" ],
"mode": "time_series"
}
},
"mappings": {
"properties": {
"hostname": {
"type": "keyword",
"time_series_dimension": true
}
}
}
}
}
"""));
assertTrue(ex.getMessage().contains("[index.mode=time_series] is incompatible with [index.sort.field]"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,16 @@ protected static void waitForLogs(RestClient client) throws Exception {
});
}

protected static Response putComponentTemplate(final RestClient client, final String templateName, final String mappings)
protected static Response putComponentTemplate(final RestClient client, final String componentTemplate, final String contends)
throws IOException {
final Request request = new Request("PUT", "/_component_template/" + templateName);
request.setJsonEntity(mappings);
final Request request = new Request("PUT", "/_component_template/" + componentTemplate);
request.setJsonEntity(contends);
return client.performRequest(request);
}

protected static Response putTemplate(final RestClient client, final String template, final String contents) throws IOException {
final Request request = new Request("PUT", "/_index_template/" + template);
request.setJsonEntity(contents);
return client.performRequest(request);
}

Expand Down Expand Up @@ -87,4 +93,11 @@ protected static Response bulkIndex(final RestClient client, final String dataSt
bulkRequest.addParameter("refresh", "true");
return client.performRequest(bulkRequest);
}

protected static Response putClusterSetting(final RestClient client, final String settingName, final Object settingValue)
throws IOException {
final Request request = new Request("PUT", "/_cluster/settings");
request.setJsonEntity("{ \"transient\": { \"" + settingName + "\": " + settingValue + " } }");
return client.performRequest(request);
}
}
1 change: 1 addition & 0 deletions x-pack/plugin/core/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@
exports org.elasticsearch.xpack.core.watcher.trigger;
exports org.elasticsearch.xpack.core.watcher.watch;
exports org.elasticsearch.xpack.core.watcher;
exports org.elasticsearch.xpack.cluster.settings;

provides org.elasticsearch.action.admin.cluster.node.info.ComponentVersionNumber
with
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.cluster.settings;

import org.elasticsearch.common.settings.Setting;

public class ClusterSettings {
public static final Setting<Boolean> CLUSTER_LOGSDB_ENABLED = Setting.boolSetting(
"cluster.logsdb.enabled",
false,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
{
"template": {
"settings": {
"index": {
"lifecycle": {
"name": "logs"
},
"mode": "logsdb",
"codec": "best_compression",
"mapping": {
"ignore_malformed": true,
"total_fields": {
"ignore_dynamic_beyond_limit": true
}
},
"default_pipeline": "logs@default-pipeline"
}
}
},
"_meta": {
"description": "default settings for the logs index template installed by x-pack",
"managed": true
},
"version": ${xpack.stack.template.version},
"deprecated": ${xpack.stack.template.deprecated}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
"lifecycle": {
"name": "logs"
},
"mode": "${xpack.stack.template.logsdb.index.mode}",
"codec": "best_compression",
"mapping": {
"ignore_malformed": true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,31 @@
import java.util.Collection;
import java.util.List;

import static org.elasticsearch.xpack.cluster.settings.ClusterSettings.CLUSTER_LOGSDB_ENABLED;
import static org.elasticsearch.xpack.logsdb.SyntheticSourceLicenseService.FALLBACK_SETTING;

public class LogsDBPlugin extends Plugin {

private final Settings settings;
private final SyntheticSourceLicenseService licenseService;

private final LogsdbIndexModeSettingsProvider logsdbIndexModeSettingsProvider;

public LogsDBPlugin(Settings settings) {
this.settings = settings;
this.licenseService = new SyntheticSourceLicenseService(settings);
this.logsdbIndexModeSettingsProvider = new LogsdbIndexModeSettingsProvider(settings);
}

@Override
public Collection<?> createComponents(PluginServices services) {
licenseService.setLicenseState(XPackPlugin.getSharedLicenseState());
var clusterSettings = services.clusterService().getClusterSettings();
clusterSettings.addSettingsUpdateConsumer(FALLBACK_SETTING, licenseService::setSyntheticSourceFallback);
clusterSettings.addSettingsUpdateConsumer(
CLUSTER_LOGSDB_ENABLED,
logsdbIndexModeSettingsProvider::updateClusterIndexModeLogsdbEnabled
);
// Nothing to share here:
return super.createComponents(services);
}
Expand All @@ -43,11 +51,11 @@ public Collection<IndexSettingProvider> getAdditionalIndexSettingProviders(Index
if (DiscoveryNode.isStateless(settings)) {
return List.of();
}
return List.of(new SyntheticSourceIndexSettingsProvider(licenseService));
return List.of(new SyntheticSourceIndexSettingsProvider(licenseService), logsdbIndexModeSettingsProvider);
}

@Override
public List<Setting<?>> getSettings() {
return List.of(FALLBACK_SETTING);
return List.of(FALLBACK_SETTING, CLUSTER_LOGSDB_ENABLED);
}
}
Loading

0 comments on commit d0dabea

Please sign in to comment.