Skip to content

Commit

Permalink
Add inline template_content support to the opensearch sink (#3431)
Browse files Browse the repository at this point in the history
Signed-off-by: Taylor Gray <tylgry@amazon.com>
  • Loading branch information
graytaylor0 committed Oct 6, 2023
1 parent 1e6a868 commit e19ae8f
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,18 @@

package org.opensearch.dataprepper.plugins.sink.opensearch.index;

import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.lang3.EnumUtils;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;
import org.opensearch.dataprepper.plugins.sink.opensearch.DistributionVersion;
import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.BulkAction;
import org.opensearch.dataprepper.plugins.sink.opensearch.s3.FileReader;
import org.opensearch.dataprepper.plugins.sink.opensearch.s3.S3ClientProvider;
import org.opensearch.dataprepper.plugins.sink.opensearch.s3.S3FileReader;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.arns.Arn;
Expand All @@ -27,8 +27,8 @@
import java.io.InputStream;
import java.net.URL;
import java.util.HashMap;
import java.util.Map;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;

Expand All @@ -44,6 +44,7 @@ public class IndexConfiguration {
public static final String INDEX_TYPE = "index_type";
public static final String TEMPLATE_TYPE = "template_type";
public static final String TEMPLATE_FILE = "template_file";
public static final String TEMPLATE_CONTENT = "template_content";
public static final String NUM_SHARDS = "number_of_shards";
public static final String NUM_REPLICAS = "number_of_replicas";
public static final String BULK_SIZE = "bulk_size";
Expand Down Expand Up @@ -105,7 +106,8 @@ private IndexConfiguration(final Builder builder) {
this.s3Client = builder.s3Client;

determineTemplateType(builder);
this.indexTemplate = readIndexTemplate(builder.templateFile, indexType, templateType);

this.indexTemplate = builder.templateContent != null ? readTemplateContent(builder.templateContent) : readIndexTemplate(builder.templateFile, indexType, templateType);

if (builder.numReplicas > 0) {
indexTemplate.putIfAbsent(SETTINGS, new HashMap<>());
Expand Down Expand Up @@ -187,6 +189,16 @@ public static IndexConfiguration readIndexConfig(final PluginSetting pluginSetti
if (templateFile != null) {
builder = builder.withTemplateFile(templateFile);
}

final String templateContent = pluginSetting.getStringOrDefault(TEMPLATE_CONTENT, null);
if (templateContent != null) {
builder = builder.withTemplateContent(templateContent);
}

if (templateContent != null && templateFile != null) {
LOG.warn("Both template_content and template_file are configured. Only template_content will be used");
}

builder = builder.withNumShards(pluginSetting.getIntegerOrDefault(NUM_SHARDS, 0));
builder = builder.withNumReplicas(pluginSetting.getIntegerOrDefault(NUM_REPLICAS, 0));
final Long batchSize = pluginSetting.getLongOrDefault(BULK_SIZE, DEFAULT_BULK_SIZE);
Expand Down Expand Up @@ -365,6 +377,7 @@ private Map<String, Object> readIndexTemplate(final String templateFile, final I
templateURL = new File(templateFile).toURI().toURL();
}
}

if (templateURL != null) {
return new ObjectMapper().readValue(templateURL, new TypeReference<Map<String, Object>>() {
});
Expand All @@ -379,6 +392,14 @@ private Map<String, Object> readIndexTemplate(final String templateFile, final I
}
}

private Map<String, Object> readTemplateContent(final String templateContent) {
try {
return OBJECT_MAPPER.readValue(templateContent, new TypeReference<Map<String, Object>>() {});
} catch (IOException ex) {
throw new InvalidPluginConfigurationException(String.format("template_content is invalid: %s", ex.getMessage()));
}
}

private URL loadExistingTemplate(TemplateType templateType, String predefinedTemplateName) {
String resourcePath = templateType == TemplateType.V1 ? predefinedTemplateName : templateType.getTypeName() + "/" + predefinedTemplateName;
return getClass().getClassLoader()
Expand All @@ -390,6 +411,7 @@ public static class Builder {
private String indexType;
private TemplateType templateType;
private String templateFile;
private String templateContent;
private int numShards;
private int numReplicas;
private String routingField;
Expand Down Expand Up @@ -437,6 +459,12 @@ public Builder withTemplateFile(final String templateFile) {
return this;
}

public Builder withTemplateContent(final String templateContent) {
checkArgument(templateContent != null, "templateContent cannot be null.");
this.templateContent = templateContent;
return this;
}

public Builder withDocumentIdField(final String documentIdField) {
checkNotNull(documentIdField, "document_id_field cannot be null");
this.documentIdField = documentIdField;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,15 @@

package org.opensearch.dataprepper.plugins.sink.opensearch.index;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.io.IOUtils;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;
import org.opensearch.dataprepper.plugins.sink.opensearch.DistributionVersion;
import software.amazon.awssdk.core.ResponseInputStream;
import software.amazon.awssdk.http.AbortableInputStream;
Expand All @@ -34,6 +38,7 @@
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThrows;
Expand All @@ -52,6 +57,8 @@

@SuppressWarnings("unchecked")
public class IndexConfigurationTests {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

private static final String DEFAULT_TEMPLATE_FILE = "test-template-withshards.json";
private static final String TEST_CUSTOM_INDEX_POLICY_FILE = "test-custom-index-policy-file.json";

Expand Down Expand Up @@ -193,7 +200,7 @@ public void testValidCustom_from_s3() {
}

@Test
public void testValidCustomWithNoTemplateFile() throws MalformedURLException {
public void testValidCustomWithNoTemplateFile() {
final String testIndexAlias = "foo";
IndexConfiguration indexConfiguration = new IndexConfiguration.Builder()
.withIndexAlias(testIndexAlias)
Expand Down Expand Up @@ -263,6 +270,43 @@ public void testValidCustomWithTemplateFileAndShards() {
assertEquals(-1, indexConfiguration.getBulkSize());
}

@Test
public void testValidCustomWithTemplateContent() throws JsonProcessingException {
final String testIndexAlias = "test";
IndexConfiguration indexConfiguration = new IndexConfiguration.Builder()
.withIndexAlias(testIndexAlias)
.withTemplateContent(createTemplateContent())
.withBulkSize(10)
.build();

assertEquals(IndexType.CUSTOM, indexConfiguration.getIndexType());
assertEquals(testIndexAlias, indexConfiguration.getIndexAlias());
assertEquals(10, indexConfiguration.getBulkSize());
assertFalse(indexConfiguration.getIndexTemplate().isEmpty());
assertThat(indexConfiguration.getIndexTemplate(), equalTo(OBJECT_MAPPER.readValue(createTemplateContent(), new TypeReference<>() {})));
}

@Test
public void readIndexConfigWithTemplateFileAndTemplateContentUsesTemplateContent() throws JsonProcessingException {
final PluginSetting pluginSetting = generatePluginSetting("custom", "test", "test-file", createTemplateContent(), null, null, null);

final IndexConfiguration objectUnderTest = IndexConfiguration.readIndexConfig(pluginSetting);

assertThat(objectUnderTest, notNullValue());
assertThat(objectUnderTest.getIndexTemplate(), notNullValue());
assertThat(objectUnderTest.getIndexTemplate(), equalTo(OBJECT_MAPPER.readValue(createTemplateContent(), new TypeReference<>() {})));
}

@Test
public void invalidTemplateContentThrowsInvalidPluginConfigurationException() {
final String invalidTemplateContent = UUID.randomUUID().toString();

final PluginSetting pluginSetting = generatePluginSetting("custom", null, null, invalidTemplateContent, null, null, null);

assertThrows(InvalidPluginConfigurationException.class, () -> IndexConfiguration.readIndexConfig(pluginSetting));

}

@Test
public void testInvalidCustom() {
// Missing index alias
Expand All @@ -274,7 +318,7 @@ public void testInvalidCustom() {
@Test
public void testReadIndexConfig_RawIndexType() {
final Map<String, Object> metadata = initializeConfigMetaData(
IndexType.TRACE_ANALYTICS_RAW.getValue(), null, null, null, null, null);
IndexType.TRACE_ANALYTICS_RAW.getValue(), null, null, null, null, null, null);
final PluginSetting pluginSetting = getPluginSetting(metadata);
final IndexConfiguration indexConfiguration = IndexConfiguration.readIndexConfig(pluginSetting);
final URL expTemplateFile = indexConfiguration
Expand All @@ -292,15 +336,15 @@ public void testReadIndexConfig_RawIndexType() {
@Test
public void testReadIndexConfig_InvalidIndexTypeValueString() {
final Map<String, Object> metadata = initializeConfigMetaData(
"i-am-an-illegitimate-index-type", null, null, null, null, null);
"i-am-an-illegitimate-index-type", null, null, null, null, null, null);
final PluginSetting pluginSetting = getPluginSetting(metadata);
assertThrows(IllegalArgumentException.class, () -> IndexConfiguration.readIndexConfig(pluginSetting));
}

@Test
public void testReadIndexConfig_ServiceMapIndexType() {
final Map<String, Object> metadata = initializeConfigMetaData(
IndexType.TRACE_ANALYTICS_SERVICE_MAP.getValue(), null, null, null, null, null);
IndexType.TRACE_ANALYTICS_SERVICE_MAP.getValue(), null, null, null, null, null, null);
final PluginSetting pluginSetting = getPluginSetting(metadata);
final IndexConfiguration indexConfiguration = IndexConfiguration.readIndexConfig(pluginSetting);
final URL expTemplateFile = indexConfiguration
Expand All @@ -324,7 +368,7 @@ public void testReadIndexConfigCustom() {
final long testFlushTimeout = 30_000L;
final String testIdField = "someId";
final PluginSetting pluginSetting = generatePluginSetting(
null, testIndexAlias, defaultTemplateFilePath, testBulkSize, testFlushTimeout, testIdField);
null, testIndexAlias, defaultTemplateFilePath, null, testBulkSize, testFlushTimeout, testIdField);
pluginSetting.getSettings().put(IndexConfiguration.ESTIMATE_BULK_SIZE_USING_COMPRESSION, true);
pluginSetting.getSettings().put(IndexConfiguration.MAX_LOCAL_COMPRESSIONS_FOR_ESTIMATION, 5);
final IndexConfiguration indexConfiguration = IndexConfiguration.readIndexConfig(pluginSetting);
Expand All @@ -348,7 +392,7 @@ public void testReadIndexConfig_ExplicitCustomIndexType() {
final long testFlushTimeout = 30_000L;
final String testIdField = "someId";
final Map<String, Object> metadata = initializeConfigMetaData(
testIndexType, testIndexAlias, defaultTemplateFilePath, testBulkSize, testFlushTimeout, testIdField);
testIndexType, testIndexAlias, defaultTemplateFilePath, null, testBulkSize, testFlushTimeout, testIdField);
final PluginSetting pluginSetting = getPluginSetting(metadata);
final IndexConfiguration indexConfiguration = IndexConfiguration.readIndexConfig(pluginSetting);
assertEquals(IndexType.CUSTOM, indexConfiguration.getIndexType());
Expand All @@ -363,7 +407,7 @@ public void testReadIndexConfig_ExplicitCustomIndexType() {
public void testReadIndexConfig_awsOptionServerlessDefault() {
final String testIndexAlias = "foo";
final Map<String, Object> metadata = initializeConfigMetaData(
null, testIndexAlias, null, null, null, null);
null, testIndexAlias, null, null, null, null, null);
metadata.put(AWS_OPTION, Map.of(SERVERLESS, true));
final PluginSetting pluginSetting = getPluginSetting(metadata);
final IndexConfiguration indexConfiguration = IndexConfiguration.readIndexConfig(pluginSetting);
Expand All @@ -375,7 +419,7 @@ public void testReadIndexConfig_awsOptionServerlessDefault() {
public void testReadIndexConfig_awsServerlessIndexTypeOverride() {
final String testIndexAlias = "foo";
final Map<String, Object> metadata = initializeConfigMetaData(
IndexType.CUSTOM.getValue(), testIndexAlias, null, null, null, null);
IndexType.CUSTOM.getValue(), testIndexAlias, null, null, null, null, null);
metadata.put(AWS_OPTION, Map.of(SERVERLESS, true));
final PluginSetting pluginSetting = getPluginSetting(metadata);
final IndexConfiguration indexConfiguration = IndexConfiguration.readIndexConfig(pluginSetting);
Expand All @@ -387,7 +431,7 @@ public void testReadIndexConfig_awsServerlessIndexTypeOverride() {
@Test
public void testReadIndexConfig_distributionVersionDefault() {
final Map<String, Object> metadata = initializeConfigMetaData(
null, "foo", null, null, null, null);
null, "foo", null,null, null, null, null);
final PluginSetting pluginSetting = getPluginSetting(metadata);
final IndexConfiguration indexConfiguration = IndexConfiguration.readIndexConfig(pluginSetting);
assertEquals(indexConfiguration.getDistributionVersion(), DistributionVersion.DEFAULT);
Expand All @@ -396,7 +440,7 @@ public void testReadIndexConfig_distributionVersionDefault() {
@Test
public void testReadIndexConfig_es6Override() {
final Map<String, Object> metadata = initializeConfigMetaData(
null, "foo", null, null, null, null);
null, "foo", null, null, null, null, null);
metadata.put(DISTRIBUTION_VERSION, "es6");
metadata.put(TEMPLATE_TYPE, TemplateType.INDEX_TEMPLATE.getTypeName());
final PluginSetting pluginSetting = getPluginSetting(metadata);
Expand All @@ -409,7 +453,7 @@ public void testReadIndexConfig_es6Override() {
@Test
public void testReadIndexConfig_documentRootKey() {
final Map<String, Object> metadata = initializeConfigMetaData(
IndexType.CUSTOM.getValue(), "foo", null, null, null, null);
IndexType.CUSTOM.getValue(), "foo", null, null, null, null, null);
final String expectedRootKey = UUID.randomUUID().toString();
metadata.put(DOCUMENT_ROOT_KEY, expectedRootKey);
final PluginSetting pluginSetting = getPluginSetting(metadata);
Expand All @@ -420,7 +464,7 @@ public void testReadIndexConfig_documentRootKey() {
@Test
public void testReadIndexConfig_emptyDocumentRootKey() {
final Map<String, Object> metadata = initializeConfigMetaData(
IndexType.CUSTOM.getValue(), "foo", null, null, null, null);
IndexType.CUSTOM.getValue(), "foo", null, null, null, null, null);
metadata.put(DOCUMENT_ROOT_KEY, "");
final PluginSetting pluginSetting = getPluginSetting(metadata);
assertThrows(IllegalArgumentException.class, () -> IndexConfiguration.readIndexConfig(pluginSetting));
Expand All @@ -429,7 +473,7 @@ public void testReadIndexConfig_emptyDocumentRootKey() {
@Test
void getTemplateType_defaults_to_V1() {
final Map<String, Object> metadata = initializeConfigMetaData(
IndexType.CUSTOM.getValue(), "foo", null, null, null, null);
IndexType.CUSTOM.getValue(), "foo", null, null, null, null, null);
final PluginSetting pluginSetting = getPluginSetting(metadata);
final IndexConfiguration indexConfiguration = IndexConfiguration.readIndexConfig(pluginSetting);
assertThat(indexConfiguration.getTemplateType(), equalTo(TemplateType.V1));
Expand All @@ -439,17 +483,17 @@ void getTemplateType_defaults_to_V1() {
@EnumSource(TemplateType.class)
void getTemplateType_with_configured_templateType(final TemplateType templateType) {
final Map<String, Object> metadata = initializeConfigMetaData(
IndexType.CUSTOM.getValue(), "foo", null, null, null, null);
IndexType.CUSTOM.getValue(), "foo", null, null, null, null, null);
metadata.put(TEMPLATE_TYPE, templateType.getTypeName());
final PluginSetting pluginSetting = getPluginSetting(metadata);
final IndexConfiguration indexConfiguration = IndexConfiguration.readIndexConfig(pluginSetting);
assertThat(indexConfiguration.getTemplateType(), equalTo(templateType));
}

private PluginSetting generatePluginSetting(
final String indexType, final String indexAlias, final String templateFilePath,
final String indexType, final String indexAlias, final String templateFilePath, final String templateContent,
final Long bulkSize, final Long flushTimeout, final String documentIdField) {
final Map<String, Object> metadata = initializeConfigMetaData(indexType, indexAlias, templateFilePath, bulkSize, flushTimeout, documentIdField);
final Map<String, Object> metadata = initializeConfigMetaData(indexType, indexAlias, templateFilePath, templateContent, bulkSize, flushTimeout, documentIdField);
return getPluginSetting(metadata);
}

Expand All @@ -458,7 +502,7 @@ private PluginSetting getPluginSetting(Map<String, Object> metadata) {
}

private Map<String, Object> initializeConfigMetaData(
String indexType, String indexAlias, String templateFilePath, Long bulkSize, Long flushTimeout, String documentId) {
String indexType, String indexAlias, String templateFilePath, String templateContent, Long bulkSize, Long flushTimeout, String documentId) {
final Map<String, Object> metadata = new HashMap<>();
if (indexType != null) {
metadata.put(IndexConfiguration.INDEX_TYPE, indexType);
Expand All @@ -469,6 +513,11 @@ private Map<String, Object> initializeConfigMetaData(
if (templateFilePath != null) {
metadata.put(IndexConfiguration.TEMPLATE_FILE, templateFilePath);
}

if (templateContent != null) {
metadata.put(IndexConfiguration.TEMPLATE_CONTENT, templateContent);
}

if (bulkSize != null) {
metadata.put(IndexConfiguration.BULK_SIZE, bulkSize);
}
Expand All @@ -480,4 +529,12 @@ private Map<String, Object> initializeConfigMetaData(
}
return metadata;
}

private String createTemplateContent() {
return "{\"index_patterns\":[\"test-*\"]," +
"\"template\":{\"aliases\":{\"my_test_logs\":{}}," +
"\"settings\":{\"number_of_shards\":5,\"number_of_replicas\":2,\"refresh_interval\":-1}," +
"\"mappings\":{\"properties\":{\"timestamp\":{\"type\":\"date\",\"format\":\"yyyy-MM-ddHH:mm:ss||yyyy-MM-dd||epoch_millis\"}," +
"\"value\":{\"type\":\"double\"}}}}}";
}
}

0 comments on commit e19ae8f

Please sign in to comment.