Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add inline template_content support to the opensearch sink #3431

Merged
merged 2 commits into from
Oct 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,18 @@

package org.opensearch.dataprepper.plugins.sink.opensearch.index;

import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.lang3.EnumUtils;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;
import org.opensearch.dataprepper.plugins.sink.opensearch.DistributionVersion;
import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.BulkAction;
import org.opensearch.dataprepper.plugins.sink.opensearch.s3.FileReader;
import org.opensearch.dataprepper.plugins.sink.opensearch.s3.S3ClientProvider;
import org.opensearch.dataprepper.plugins.sink.opensearch.s3.S3FileReader;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.arns.Arn;
Expand All @@ -27,8 +27,8 @@
import java.io.InputStream;
import java.net.URL;
import java.util.HashMap;
import java.util.Map;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;

Expand All @@ -44,6 +44,7 @@ public class IndexConfiguration {
public static final String INDEX_TYPE = "index_type";
public static final String TEMPLATE_TYPE = "template_type";
public static final String TEMPLATE_FILE = "template_file";
public static final String TEMPLATE_CONTENT = "template_content";
public static final String NUM_SHARDS = "number_of_shards";
public static final String NUM_REPLICAS = "number_of_replicas";
public static final String BULK_SIZE = "bulk_size";
Expand Down Expand Up @@ -105,7 +106,8 @@ private IndexConfiguration(final Builder builder) {
this.s3Client = builder.s3Client;

determineTemplateType(builder);
this.indexTemplate = readIndexTemplate(builder.templateFile, indexType, templateType);

this.indexTemplate = builder.templateContent != null ? readTemplateContent(builder.templateContent) : readIndexTemplate(builder.templateFile, indexType, templateType);

if (builder.numReplicas > 0) {
indexTemplate.putIfAbsent(SETTINGS, new HashMap<>());
Expand Down Expand Up @@ -187,6 +189,16 @@ public static IndexConfiguration readIndexConfig(final PluginSetting pluginSetti
if (templateFile != null) {
builder = builder.withTemplateFile(templateFile);
}

final String templateContent = pluginSetting.getStringOrDefault(TEMPLATE_CONTENT, null);
if (templateContent != null) {
builder = builder.withTemplateContent(templateContent);
}

if (templateContent != null && templateFile != null) {
LOG.warn("Both template_content and template_file are configured. Only template_content will be used");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was there a particular reason you have the content take precedence over the file?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not necessarily but if anything i am figuring the template_content will be more widely used

}

builder = builder.withNumShards(pluginSetting.getIntegerOrDefault(NUM_SHARDS, 0));
builder = builder.withNumReplicas(pluginSetting.getIntegerOrDefault(NUM_REPLICAS, 0));
final Long batchSize = pluginSetting.getLongOrDefault(BULK_SIZE, DEFAULT_BULK_SIZE);
Expand Down Expand Up @@ -365,6 +377,7 @@ private Map<String, Object> readIndexTemplate(final String templateFile, final I
templateURL = new File(templateFile).toURI().toURL();
}
}

if (templateURL != null) {
return new ObjectMapper().readValue(templateURL, new TypeReference<Map<String, Object>>() {
});
Expand All @@ -379,6 +392,14 @@ private Map<String, Object> readIndexTemplate(final String templateFile, final I
}
}

private Map<String, Object> readTemplateContent(final String templateContent) {
try {
return OBJECT_MAPPER.readValue(templateContent, new TypeReference<Map<String, Object>>() {});
} catch (IOException ex) {
throw new InvalidPluginConfigurationException(String.format("template_content is invalid: %s", ex.getMessage()));
}
}

private URL loadExistingTemplate(TemplateType templateType, String predefinedTemplateName) {
String resourcePath = templateType == TemplateType.V1 ? predefinedTemplateName : templateType.getTypeName() + "/" + predefinedTemplateName;
return getClass().getClassLoader()
Expand All @@ -390,6 +411,7 @@ public static class Builder {
private String indexType;
private TemplateType templateType;
private String templateFile;
private String templateContent;
private int numShards;
private int numReplicas;
private String routingField;
Expand Down Expand Up @@ -437,6 +459,12 @@ public Builder withTemplateFile(final String templateFile) {
return this;
}

public Builder withTemplateContent(final String templateContent) {
checkArgument(templateContent != null, "templateContent cannot be null.");
this.templateContent = templateContent;
return this;
}

public Builder withDocumentIdField(final String documentIdField) {
checkNotNull(documentIdField, "document_id_field cannot be null");
this.documentIdField = documentIdField;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,15 @@

package org.opensearch.dataprepper.plugins.sink.opensearch.index;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.io.IOUtils;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;
import org.opensearch.dataprepper.plugins.sink.opensearch.DistributionVersion;
import software.amazon.awssdk.core.ResponseInputStream;
import software.amazon.awssdk.http.AbortableInputStream;
Expand All @@ -34,6 +38,7 @@
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThrows;
Expand All @@ -52,6 +57,8 @@

@SuppressWarnings("unchecked")
public class IndexConfigurationTests {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

private static final String DEFAULT_TEMPLATE_FILE = "test-template-withshards.json";
private static final String TEST_CUSTOM_INDEX_POLICY_FILE = "test-custom-index-policy-file.json";

Expand Down Expand Up @@ -193,7 +200,7 @@ public void testValidCustom_from_s3() {
}

@Test
public void testValidCustomWithNoTemplateFile() throws MalformedURLException {
public void testValidCustomWithNoTemplateFile() {
final String testIndexAlias = "foo";
IndexConfiguration indexConfiguration = new IndexConfiguration.Builder()
.withIndexAlias(testIndexAlias)
Expand Down Expand Up @@ -263,6 +270,43 @@ public void testValidCustomWithTemplateFileAndShards() {
assertEquals(-1, indexConfiguration.getBulkSize());
}

@Test
public void testValidCustomWithTemplateContent() throws JsonProcessingException {
final String testIndexAlias = "test";
IndexConfiguration indexConfiguration = new IndexConfiguration.Builder()
.withIndexAlias(testIndexAlias)
.withTemplateContent(createTemplateContent())
.withBulkSize(10)
.build();

assertEquals(IndexType.CUSTOM, indexConfiguration.getIndexType());
assertEquals(testIndexAlias, indexConfiguration.getIndexAlias());
assertEquals(10, indexConfiguration.getBulkSize());
assertFalse(indexConfiguration.getIndexTemplate().isEmpty());
assertThat(indexConfiguration.getIndexTemplate(), equalTo(OBJECT_MAPPER.readValue(createTemplateContent(), new TypeReference<>() {})));
}

@Test
public void readIndexConfigWithTemplateFileAndTemplateContentUsesTemplateContent() throws JsonProcessingException {
final PluginSetting pluginSetting = generatePluginSetting("custom", "test", "test-file", createTemplateContent(), null, null, null);

final IndexConfiguration objectUnderTest = IndexConfiguration.readIndexConfig(pluginSetting);

assertThat(objectUnderTest, notNullValue());
assertThat(objectUnderTest.getIndexTemplate(), notNullValue());
assertThat(objectUnderTest.getIndexTemplate(), equalTo(OBJECT_MAPPER.readValue(createTemplateContent(), new TypeReference<>() {})));
}

@Test
public void invalidTemplateContentThrowsInvalidPluginConfigurationException() {
final String invalidTemplateContent = UUID.randomUUID().toString();

final PluginSetting pluginSetting = generatePluginSetting("custom", null, null, invalidTemplateContent, null, null, null);

assertThrows(InvalidPluginConfigurationException.class, () -> IndexConfiguration.readIndexConfig(pluginSetting));

}

@Test
public void testInvalidCustom() {
// Missing index alias
Expand All @@ -274,7 +318,7 @@ public void testInvalidCustom() {
@Test
public void testReadIndexConfig_RawIndexType() {
final Map<String, Object> metadata = initializeConfigMetaData(
IndexType.TRACE_ANALYTICS_RAW.getValue(), null, null, null, null, null);
IndexType.TRACE_ANALYTICS_RAW.getValue(), null, null, null, null, null, null);
final PluginSetting pluginSetting = getPluginSetting(metadata);
final IndexConfiguration indexConfiguration = IndexConfiguration.readIndexConfig(pluginSetting);
final URL expTemplateFile = indexConfiguration
Expand All @@ -292,15 +336,15 @@ public void testReadIndexConfig_RawIndexType() {
@Test
public void testReadIndexConfig_InvalidIndexTypeValueString() {
final Map<String, Object> metadata = initializeConfigMetaData(
"i-am-an-illegitimate-index-type", null, null, null, null, null);
"i-am-an-illegitimate-index-type", null, null, null, null, null, null);
final PluginSetting pluginSetting = getPluginSetting(metadata);
assertThrows(IllegalArgumentException.class, () -> IndexConfiguration.readIndexConfig(pluginSetting));
}

@Test
public void testReadIndexConfig_ServiceMapIndexType() {
final Map<String, Object> metadata = initializeConfigMetaData(
IndexType.TRACE_ANALYTICS_SERVICE_MAP.getValue(), null, null, null, null, null);
IndexType.TRACE_ANALYTICS_SERVICE_MAP.getValue(), null, null, null, null, null, null);
final PluginSetting pluginSetting = getPluginSetting(metadata);
final IndexConfiguration indexConfiguration = IndexConfiguration.readIndexConfig(pluginSetting);
final URL expTemplateFile = indexConfiguration
Expand All @@ -324,7 +368,7 @@ public void testReadIndexConfigCustom() {
final long testFlushTimeout = 30_000L;
final String testIdField = "someId";
final PluginSetting pluginSetting = generatePluginSetting(
null, testIndexAlias, defaultTemplateFilePath, testBulkSize, testFlushTimeout, testIdField);
null, testIndexAlias, defaultTemplateFilePath, null, testBulkSize, testFlushTimeout, testIdField);
pluginSetting.getSettings().put(IndexConfiguration.ESTIMATE_BULK_SIZE_USING_COMPRESSION, true);
pluginSetting.getSettings().put(IndexConfiguration.MAX_LOCAL_COMPRESSIONS_FOR_ESTIMATION, 5);
final IndexConfiguration indexConfiguration = IndexConfiguration.readIndexConfig(pluginSetting);
Expand All @@ -348,7 +392,7 @@ public void testReadIndexConfig_ExplicitCustomIndexType() {
final long testFlushTimeout = 30_000L;
final String testIdField = "someId";
final Map<String, Object> metadata = initializeConfigMetaData(
testIndexType, testIndexAlias, defaultTemplateFilePath, testBulkSize, testFlushTimeout, testIdField);
testIndexType, testIndexAlias, defaultTemplateFilePath, null, testBulkSize, testFlushTimeout, testIdField);
final PluginSetting pluginSetting = getPluginSetting(metadata);
final IndexConfiguration indexConfiguration = IndexConfiguration.readIndexConfig(pluginSetting);
assertEquals(IndexType.CUSTOM, indexConfiguration.getIndexType());
Expand All @@ -363,7 +407,7 @@ public void testReadIndexConfig_ExplicitCustomIndexType() {
public void testReadIndexConfig_awsOptionServerlessDefault() {
final String testIndexAlias = "foo";
final Map<String, Object> metadata = initializeConfigMetaData(
null, testIndexAlias, null, null, null, null);
null, testIndexAlias, null, null, null, null, null);
metadata.put(AWS_OPTION, Map.of(SERVERLESS, true));
final PluginSetting pluginSetting = getPluginSetting(metadata);
final IndexConfiguration indexConfiguration = IndexConfiguration.readIndexConfig(pluginSetting);
Expand All @@ -375,7 +419,7 @@ public void testReadIndexConfig_awsOptionServerlessDefault() {
public void testReadIndexConfig_awsServerlessIndexTypeOverride() {
final String testIndexAlias = "foo";
final Map<String, Object> metadata = initializeConfigMetaData(
IndexType.CUSTOM.getValue(), testIndexAlias, null, null, null, null);
IndexType.CUSTOM.getValue(), testIndexAlias, null, null, null, null, null);
metadata.put(AWS_OPTION, Map.of(SERVERLESS, true));
final PluginSetting pluginSetting = getPluginSetting(metadata);
final IndexConfiguration indexConfiguration = IndexConfiguration.readIndexConfig(pluginSetting);
Expand All @@ -387,7 +431,7 @@ public void testReadIndexConfig_awsServerlessIndexTypeOverride() {
@Test
public void testReadIndexConfig_distributionVersionDefault() {
final Map<String, Object> metadata = initializeConfigMetaData(
null, "foo", null, null, null, null);
null, "foo", null,null, null, null, null);
final PluginSetting pluginSetting = getPluginSetting(metadata);
final IndexConfiguration indexConfiguration = IndexConfiguration.readIndexConfig(pluginSetting);
assertEquals(indexConfiguration.getDistributionVersion(), DistributionVersion.DEFAULT);
Expand All @@ -396,7 +440,7 @@ public void testReadIndexConfig_distributionVersionDefault() {
@Test
public void testReadIndexConfig_es6Override() {
final Map<String, Object> metadata = initializeConfigMetaData(
null, "foo", null, null, null, null);
null, "foo", null, null, null, null, null);
metadata.put(DISTRIBUTION_VERSION, "es6");
metadata.put(TEMPLATE_TYPE, TemplateType.INDEX_TEMPLATE.getTypeName());
final PluginSetting pluginSetting = getPluginSetting(metadata);
Expand All @@ -409,7 +453,7 @@ public void testReadIndexConfig_es6Override() {
@Test
public void testReadIndexConfig_documentRootKey() {
final Map<String, Object> metadata = initializeConfigMetaData(
IndexType.CUSTOM.getValue(), "foo", null, null, null, null);
IndexType.CUSTOM.getValue(), "foo", null, null, null, null, null);
final String expectedRootKey = UUID.randomUUID().toString();
metadata.put(DOCUMENT_ROOT_KEY, expectedRootKey);
final PluginSetting pluginSetting = getPluginSetting(metadata);
Expand All @@ -420,7 +464,7 @@ public void testReadIndexConfig_documentRootKey() {
@Test
public void testReadIndexConfig_emptyDocumentRootKey() {
final Map<String, Object> metadata = initializeConfigMetaData(
IndexType.CUSTOM.getValue(), "foo", null, null, null, null);
IndexType.CUSTOM.getValue(), "foo", null, null, null, null, null);
metadata.put(DOCUMENT_ROOT_KEY, "");
final PluginSetting pluginSetting = getPluginSetting(metadata);
assertThrows(IllegalArgumentException.class, () -> IndexConfiguration.readIndexConfig(pluginSetting));
Expand All @@ -429,7 +473,7 @@ public void testReadIndexConfig_emptyDocumentRootKey() {
@Test
void getTemplateType_defaults_to_V1() {
final Map<String, Object> metadata = initializeConfigMetaData(
IndexType.CUSTOM.getValue(), "foo", null, null, null, null);
IndexType.CUSTOM.getValue(), "foo", null, null, null, null, null);
final PluginSetting pluginSetting = getPluginSetting(metadata);
final IndexConfiguration indexConfiguration = IndexConfiguration.readIndexConfig(pluginSetting);
assertThat(indexConfiguration.getTemplateType(), equalTo(TemplateType.V1));
Expand All @@ -439,17 +483,17 @@ void getTemplateType_defaults_to_V1() {
@EnumSource(TemplateType.class)
void getTemplateType_with_configured_templateType(final TemplateType templateType) {
final Map<String, Object> metadata = initializeConfigMetaData(
IndexType.CUSTOM.getValue(), "foo", null, null, null, null);
IndexType.CUSTOM.getValue(), "foo", null, null, null, null, null);
metadata.put(TEMPLATE_TYPE, templateType.getTypeName());
final PluginSetting pluginSetting = getPluginSetting(metadata);
final IndexConfiguration indexConfiguration = IndexConfiguration.readIndexConfig(pluginSetting);
assertThat(indexConfiguration.getTemplateType(), equalTo(templateType));
}

private PluginSetting generatePluginSetting(
final String indexType, final String indexAlias, final String templateFilePath,
final String indexType, final String indexAlias, final String templateFilePath, final String templateContent,
final Long bulkSize, final Long flushTimeout, final String documentIdField) {
final Map<String, Object> metadata = initializeConfigMetaData(indexType, indexAlias, templateFilePath, bulkSize, flushTimeout, documentIdField);
final Map<String, Object> metadata = initializeConfigMetaData(indexType, indexAlias, templateFilePath, templateContent, bulkSize, flushTimeout, documentIdField);
return getPluginSetting(metadata);
}

Expand All @@ -458,7 +502,7 @@ private PluginSetting getPluginSetting(Map<String, Object> metadata) {
}

private Map<String, Object> initializeConfigMetaData(
String indexType, String indexAlias, String templateFilePath, Long bulkSize, Long flushTimeout, String documentId) {
String indexType, String indexAlias, String templateFilePath, String templateContent, Long bulkSize, Long flushTimeout, String documentId) {
final Map<String, Object> metadata = new HashMap<>();
if (indexType != null) {
metadata.put(IndexConfiguration.INDEX_TYPE, indexType);
Expand All @@ -469,6 +513,11 @@ private Map<String, Object> initializeConfigMetaData(
if (templateFilePath != null) {
metadata.put(IndexConfiguration.TEMPLATE_FILE, templateFilePath);
}

if (templateContent != null) {
metadata.put(IndexConfiguration.TEMPLATE_CONTENT, templateContent);
}

if (bulkSize != null) {
metadata.put(IndexConfiguration.BULK_SIZE, bulkSize);
}
Expand All @@ -480,4 +529,12 @@ private Map<String, Object> initializeConfigMetaData(
}
return metadata;
}

private String createTemplateContent() {
return "{\"index_patterns\":[\"test-*\"]," +
"\"template\":{\"aliases\":{\"my_test_logs\":{}}," +
"\"settings\":{\"number_of_shards\":5,\"number_of_replicas\":2,\"refresh_interval\":-1}," +
"\"mappings\":{\"properties\":{\"timestamp\":{\"type\":\"date\",\"format\":\"yyyy-MM-ddHH:mm:ss||yyyy-MM-dd||epoch_millis\"}," +
"\"value\":{\"type\":\"double\"}}}}}";
}
}
Loading