diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/configuration/PluginSetting.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/configuration/PluginSetting.java index eae5362001..6924cb35bc 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/configuration/PluginSetting.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/configuration/PluginSetting.java @@ -157,6 +157,34 @@ public List getTypedList(final String attribute, final Class type) { return (List) object; } + /** + * Returns the value of the specified {@literal List>}, or {@code defaultValue} if this settings contains no value for + * the attribute. + * + * @param keyType key type of the Map + * @param valueType value type stored in the Map + * @param The key type + * @param The value type + * @return the value of the specified attribute, or {@code defaultValue} if this settings contains no value for + * the attribute + */ + public List> getTypedListOfMaps(final String attribute, final Class keyType, final Class valueType) { + Object object = getAttributeOrDefault(attribute, null); + if (object == null) { + return null; + } + + checkObjectType(attribute, object, List.class); + + for (final Map listItem: (List>) object) { + ((Map) listItem).forEach((key, value) -> { + checkObjectType(attribute, key, keyType); + checkObjectType(attribute, value, valueType); + }); + } + return (List>) object; + } + /** * Returns the value of the specified {@literal Map object}, or {@code defaultValue} if this settings contains no value for * the attribute. diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/Event.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/Event.java index 133abfdb2f..a27906b2cc 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/Event.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/Event.java @@ -6,6 +6,7 @@ package org.opensearch.dataprepper.model.event; import org.opensearch.dataprepper.expression.ExpressionEvaluator; +import com.fasterxml.jackson.databind.JsonNode; import java.io.Serializable; import java.util.List; @@ -72,6 +73,14 @@ public interface Event extends Serializable { */ String toJsonString(); + /** + * Returns the JsonNode containing the internal representation of the event + * + * @return JsonNode + * @since 2.5 + */ + JsonNode getJsonNode(); + /** * Gets a serialized Json string of the specific key in the Event * diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java index 389a73db31..f1c5357a11 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java @@ -119,7 +119,8 @@ private JsonNode getInitialJsonNode(final Object data) { return mapper.valueToTree(data); } - protected JsonNode getJsonNode() { + @Override + public JsonNode getJsonNode() { return jsonNode; } @@ -319,6 +320,28 @@ public String formatString(final String format, final ExpressionEvaluator expres return formatStringInternal(format, expressionEvaluator); } + public static boolean isValidFormatExpressions(final String format, final ExpressionEvaluator expressionEvaluator) { + if (Objects.isNull(expressionEvaluator)) { + return false; + } + int fromIndex = 0; + int position = 0; + while ((position = format.indexOf("${", fromIndex)) != -1) { + int endPosition = format.indexOf("}", position + 1); + if (endPosition == -1) { + return false; + } + String name = format.substring(position + 2, endPosition); + + Object val; + if (!expressionEvaluator.isValidExpressionStatement(name)) { + return false; + } + fromIndex = endPosition + 1; + } + return true; + } + private String formatStringInternal(final String format, final ExpressionEvaluator expressionEvaluator) { int fromIndex = 0; String result = ""; diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/configuration/PluginSettingsTests.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/configuration/PluginSettingsTests.java index d1a058165d..679062e4fe 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/configuration/PluginSettingsTests.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/configuration/PluginSettingsTests.java @@ -46,6 +46,7 @@ public class PluginSettingsTests { private static final String TEST_INT_ATTRIBUTE = "int-attribute"; private static final String TEST_STRING_ATTRIBUTE = "string-attribute"; private static final String TEST_STRINGLIST_ATTRIBUTE = "list-attribute"; + private static final String TEST_LIST_OF_MAPS_ATTRIBUTE = "map-list-attribute"; private static final String TEST_STRINGMAP_ATTRIBUTE = "map-attribute"; private static final String TEST_STRINGLISTMAP_ATTRIBUTE = "list-map-attribute"; private static final String TEST_BOOL_ATTRIBUTE = "bool-attribute"; @@ -148,6 +149,16 @@ public void testGetTypedMap() { assertThat(pluginSetting.getTypedMap(TEST_STRINGMAP_ATTRIBUTE, String.class, String.class), is(equalTo(TEST_STRINGMAP_VALUE))); } + @Test + public void testGetTypedListOfMaps() { + final Map TEST_SETTINGS_MAP = ImmutableMap.of(TEST_STRING_ATTRIBUTE, TEST_STRING_VALUE); + final List> TEST_SETTINGS_LIST = List.of(TEST_SETTINGS_MAP); + final Map TEST_SETTINGS = ImmutableMap.of(TEST_LIST_OF_MAPS_ATTRIBUTE, TEST_SETTINGS_LIST); + final PluginSetting pluginSetting = new PluginSetting(TEST_PLUGIN_NAME, TEST_SETTINGS); + + assertThat(pluginSetting.getTypedListOfMaps(TEST_LIST_OF_MAPS_ATTRIBUTE, String.class, String.class), is(equalTo(List.of(TEST_SETTINGS_MAP)))); + } + @Test public void testGetTypedListMap() { final Map TEST_SETTINGS = ImmutableMap.of(TEST_STRINGLISTMAP_ATTRIBUTE, TEST_STRINGLISTMAP_VALUE); @@ -270,6 +281,20 @@ public void testGetTypedListMap_AsNull() { assertThat(pluginSetting.getTypedListMap(TEST_STRINGLISTMAP_NULL_ATTRIBUTE, String.class, String.class), nullValue()); } + /** + * Request attributes are present with null values, expect nulls to be returned + */ + @Test + public void testGetTypedListOfMaps_AsNull() { + final String TEST_STRINGLISTOFMAPS_NULL_ATTRIBUTE = "typedlistofmaps-null-attribute"; + final Map TEST_SETTINGS_AS_NULL = new HashMap<>(); + + TEST_SETTINGS_AS_NULL.put(TEST_STRINGLISTOFMAPS_NULL_ATTRIBUTE, null); + final PluginSetting pluginSetting = new PluginSetting(TEST_PLUGIN_NAME, TEST_SETTINGS_AS_NULL); + + assertThat(pluginSetting.getTypedListOfMaps(TEST_STRINGLISTOFMAPS_NULL_ATTRIBUTE, String.class, String.class), nullValue()); + } + /** * Request attributes are present with null values, expect nulls to be returned */ diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/JacksonEventTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/JacksonEventTest.java index 7b2dd728da..f407d7ad12 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/JacksonEventTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/JacksonEventTest.java @@ -530,6 +530,25 @@ public void testBuild_withFormatString(String formattedString, String finalStrin assertThat(event.formatString(formattedString), is(equalTo(finalString))); } + @ParameterizedTest + @CsvSource({ + "abc-${/foo, false", + "abc-${/foo}, true", + "abc-${getMetadata(\"key\")}, true", + "abc-${getXYZ(\"key\")}, false" + }) + public void testBuild_withIsValidFormatExpressions(final String format, final Boolean expectedResult) { + final ExpressionEvaluator expressionEvaluator = mock(ExpressionEvaluator.class); + when(expressionEvaluator.isValidExpressionStatement("/foo")).thenReturn(true); + when(expressionEvaluator.isValidExpressionStatement("getMetadata(\"key\")")).thenReturn(true); + assertThat(JacksonEvent.isValidFormatExpressions(format, expressionEvaluator), equalTo(expectedResult)); + } + + @Test + public void testBuild_withIsValidFormatExpressionsWithNullEvaluator() { + assertThat(JacksonEvent.isValidFormatExpressions("${}", null), equalTo(false)); + } + @Test public void testBuild_withFormatStringWithExpressionEvaluator() { diff --git a/data-prepper-plugins/opensearch/README.md b/data-prepper-plugins/opensearch/README.md index 2bede56c52..9fc8061708 100644 --- a/data-prepper-plugins/opensearch/README.md +++ b/data-prepper-plugins/opensearch/README.md @@ -146,6 +146,28 @@ e.g. [otel-v1-apm-span-index-template.json](https://github.com/opensearch-projec - `dlq_file`(optional): A String of absolute file path for DLQ failed output records. Defaults to null. If not provided, failed records will be written into the default data-prepper log file (`logs/Data-Prepper.log`). If the `dlq` option is present along with this, an error is thrown. +- `action`(optional): A string indicating the type of action to be performed. Supported values are "create", "update", "upsert", "delete" and "index". Default value is "index". It also be an expression which evaluates to one of the supported values mentioned earlier. + +- `actions`(optional): This is an alternative to `action`. `actions` can have multiple actions, each with a condition. The first action for which the condition evaluates to true is picked as the action for an event. The action must be one of the supported values mentioned under `action` field above. Just like in case of `action`, the `type` mentioned in `actions` can be an expression which evaluates to one of the supported values. For example, the following configuration shows different action types for different conditions. + +``` + sink: + - opensearch + actions: + - type: "create" + when: "/some_key == CREATE" + - type: "index" + when: "/some_key == INDEX" + - type: "upsert" + when: "/some_key == UPSERT" + - type: "update" + when: "/some_key == UPDATE" + - type: "delete" + when: "/some_key == DELETE" + # default case + - type: "index" +``` + - `dlq` (optional): DLQ configurations. See [DLQ](https://github.com/opensearch-project/data-prepper/tree/main/data-prepper-plugins/failures-common/src/main/java/org/opensearch/dataprepper/plugins/dlq/README.md) for details. If the `dlq_file` option is present along with this, an error is thrown. - `max_retries`(optional): A number indicating the maximum number of times OpenSearch Sink should try to push the data to the OpenSearch server before considering it as failure. Defaults to `Integer.MAX_VALUE`. diff --git a/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java b/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java index 84aa8b174e..7dccdaaa68 100644 --- a/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java +++ b/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java @@ -104,6 +104,7 @@ public class OpenSearchSinkIT { private static final String PIPELINE_NAME = "integTestPipeline"; private static final String TEST_CUSTOM_INDEX_POLICY_FILE = "test-custom-index-policy-file.json"; private static final String TEST_TEMPLATE_V1_FILE = "test-index-template.json"; + private static final String TEST_TEMPLATE_BULK_FILE = "test-bulk-template.json"; private static final String TEST_TEMPLATE_V2_FILE = "test-index-template-v2.json"; private static final String TEST_INDEX_TEMPLATE_V1_FILE = "test-composable-index-template.json"; private static final String TEST_INDEX_TEMPLATE_V2_FILE = "test-composable-index-template-v2.json"; @@ -640,6 +641,248 @@ public void testBulkActionCreate() throws IOException, InterruptedException { Assert.assertEquals(1.0, bulkRequestLatencies.get(0).getValue(), 0); } + @Test + public void testBulkActionCreateWithExpression() throws IOException, InterruptedException { + final String testIndexAlias = "test-alias"; + final String testTemplateFile = Objects.requireNonNull( + getClass().getClassLoader().getResource(TEST_TEMPLATE_V1_FILE)).getFile(); + final String testIdField = "someId"; + final String testId = "foo"; + final List> testRecords = Collections.singletonList(jsonStringToRecord(generateCustomRecordJson(testIdField, testId))); + final PluginSetting pluginSetting = generatePluginSetting(null, testIndexAlias, testTemplateFile); + pluginSetting.getSettings().put(IndexConfiguration.DOCUMENT_ID_FIELD, testIdField); + Event event = (Event)testRecords.get(0).getData(); + event.getMetadata().setAttribute("action", "create"); + when(expressionEvaluator.isValidExpressionStatement("getMetadata(\"action\")")).thenReturn(true); + when(expressionEvaluator.evaluate("getMetadata(\"action\")", event)).thenReturn(event.getMetadata().getAttribute("action")); + pluginSetting.getSettings().put(IndexConfiguration.ACTION, "${getMetadata(\"action\")}"); + final OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); + sink.output(testRecords); + final List> retSources = getSearchResponseDocSources(testIndexAlias); + MatcherAssert.assertThat(retSources.size(), equalTo(1)); + MatcherAssert.assertThat(getDocumentCount(testIndexAlias, "_id", testId), equalTo(Integer.valueOf(1))); + sink.shutdown(); + + // verify metrics + final List bulkRequestLatencies = MetricsTestUtil.getMeasurementList( + new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) + .add(OpenSearchSink.BULKREQUEST_LATENCY).toString()); + MatcherAssert.assertThat(bulkRequestLatencies.size(), equalTo(3)); + // COUNT + Assert.assertEquals(1.0, bulkRequestLatencies.get(0).getValue(), 0); + } + + @Test + public void testBulkActionCreateWithInvalidExpression() throws IOException, InterruptedException { + final String testIndexAlias = "test-alias"; + final String testTemplateFile = Objects.requireNonNull( + getClass().getClassLoader().getResource(TEST_TEMPLATE_V1_FILE)).getFile(); + final String testIdField = "someId"; + final String testId = "foo"; + final List> testRecords = Collections.singletonList(jsonStringToRecord(generateCustomRecordJson(testIdField, testId))); + final PluginSetting pluginSetting = generatePluginSetting(null, testIndexAlias, testTemplateFile); + pluginSetting.getSettings().put(IndexConfiguration.DOCUMENT_ID_FIELD, testIdField); + Event event = (Event)testRecords.get(0).getData(); + event.getMetadata().setAttribute("action", "unknown"); + when(expressionEvaluator.isValidExpressionStatement("getMetadata(\"action\")")).thenReturn(true); + when(expressionEvaluator.evaluate("getMetadata(\"action\")", event)).thenReturn(event.getMetadata().getAttribute("action")); + pluginSetting.getSettings().put(IndexConfiguration.ACTION, "${getMetadata(\"action\")}"); + final OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); + sink.output(testRecords); + final List> retSources = getSearchResponseDocSources(testIndexAlias); + MatcherAssert.assertThat(retSources.size(), equalTo(0)); + MatcherAssert.assertThat(sink.getInvalidActionErrorsCount(), equalTo(1.0)); + sink.shutdown(); + } + + @Test + public void testBulkActionCreateWithActions() throws IOException, InterruptedException { + final String testIndexAlias = "test-alias"; + final String testTemplateFile = Objects.requireNonNull( + getClass().getClassLoader().getResource(TEST_TEMPLATE_V1_FILE)).getFile(); + final String testIdField = "someId"; + final String testId = "foo"; + final List> testRecords = Collections.singletonList(jsonStringToRecord(generateCustomRecordJson(testIdField, testId))); + + final PluginSetting pluginSetting = generatePluginSetting(null, testIndexAlias, testTemplateFile); + pluginSetting.getSettings().put(IndexConfiguration.DOCUMENT_ID_FIELD, testIdField); + List> aList = new ArrayList<>(); + Map aMap = new HashMap<>(); + aMap.put("type", BulkAction.CREATE.toString()); + aList.add(aMap); + pluginSetting.getSettings().put(IndexConfiguration.ACTIONS, aList); + final OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); + sink.output(testRecords); + final List> retSources = getSearchResponseDocSources(testIndexAlias); + MatcherAssert.assertThat(retSources.size(), equalTo(1)); + MatcherAssert.assertThat(getDocumentCount(testIndexAlias, "_id", testId), equalTo(Integer.valueOf(1))); + sink.shutdown(); + + // verify metrics + final List bulkRequestLatencies = MetricsTestUtil.getMeasurementList( + new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) + .add(OpenSearchSink.BULKREQUEST_LATENCY).toString()); + MatcherAssert.assertThat(bulkRequestLatencies.size(), equalTo(3)); + // COUNT + Assert.assertEquals(1.0, bulkRequestLatencies.get(0).getValue(), 0); + } + + @Test + public void testBulkActionUpdateWithActions() throws IOException, InterruptedException { + final String testIndexAlias = "test-alias-upd1"; + final String testTemplateFile = Objects.requireNonNull( + getClass().getClassLoader().getResource(TEST_TEMPLATE_BULK_FILE)).getFile(); + + final String testIdField = "someId"; + final String testId = "foo"; + List> testRecords = Collections.singletonList(jsonStringToRecord(generateCustomRecordJson2(testIdField, testId, "name", "value1"))); + + final PluginSetting pluginSetting = generatePluginSetting(null, testIndexAlias, testTemplateFile); + pluginSetting.getSettings().put(IndexConfiguration.DOCUMENT_ID_FIELD, testIdField); + List> aList = new ArrayList<>(); + Map aMap = new HashMap<>(); + aMap.put("type", BulkAction.CREATE.toString()); + aList.add(aMap); + pluginSetting.getSettings().put(IndexConfiguration.ACTIONS, aList); + OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); + sink.output(testRecords); + List> retSources = getSearchResponseDocSources(testIndexAlias); + MatcherAssert.assertThat(retSources.size(), equalTo(1)); + MatcherAssert.assertThat(getDocumentCount(testIndexAlias, "_id", testId), equalTo(Integer.valueOf(1))); + sink.shutdown(); + + // verify metrics + final List bulkRequestLatencies = MetricsTestUtil.getMeasurementList( + new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) + .add(OpenSearchSink.BULKREQUEST_LATENCY).toString()); + MatcherAssert.assertThat(bulkRequestLatencies.size(), equalTo(3)); + // COUNT + Assert.assertEquals(1.0, bulkRequestLatencies.get(0).getValue(), 0); + testRecords = Collections.singletonList(jsonStringToRecord(generateCustomRecordJson2(testIdField, testId, "name", "value2"))); + aList = new ArrayList<>(); + aMap = new HashMap<>(); + aMap.put("type", BulkAction.UPDATE.toString()); + aList.add(aMap); + pluginSetting.getSettings().put(IndexConfiguration.ACTIONS, aList); + sink = createObjectUnderTest(pluginSetting, true); + sink.output(testRecords); + retSources = getSearchResponseDocSources(testIndexAlias); + + MatcherAssert.assertThat(retSources.size(), equalTo(1)); + Map source = retSources.get(0); + MatcherAssert.assertThat((String)source.get("name"), equalTo("value2")); + MatcherAssert.assertThat(getDocumentCount(testIndexAlias, "_id", testId), equalTo(Integer.valueOf(1))); + sink.shutdown(); + } + + @Test + public void testBulkActionUpsertWithActions() throws IOException, InterruptedException { + final String testIndexAlias = "test-alias-upd2"; + final String testTemplateFile = Objects.requireNonNull( + getClass().getClassLoader().getResource(TEST_TEMPLATE_BULK_FILE)).getFile(); + + final String testIdField = "someId"; + final String testId = "foo"; + List> testRecords = Collections.singletonList(jsonStringToRecord(generateCustomRecordJson2(testIdField, testId, "name", "value1"))); + + final PluginSetting pluginSetting = generatePluginSetting(null, testIndexAlias, testTemplateFile); + pluginSetting.getSettings().put(IndexConfiguration.DOCUMENT_ID_FIELD, testIdField); + List> aList = new ArrayList<>(); + Map aMap = new HashMap<>(); + aMap.put("type", BulkAction.CREATE.toString()); + aList.add(aMap); + pluginSetting.getSettings().put(IndexConfiguration.ACTIONS, aList); + OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); + sink.output(testRecords); + List> retSources = getSearchResponseDocSources(testIndexAlias); + MatcherAssert.assertThat(retSources.size(), equalTo(1)); + MatcherAssert.assertThat(getDocumentCount(testIndexAlias, "_id", testId), equalTo(Integer.valueOf(1))); + sink.shutdown(); + + // verify metrics + final List bulkRequestLatencies = MetricsTestUtil.getMeasurementList( + new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) + .add(OpenSearchSink.BULKREQUEST_LATENCY).toString()); + MatcherAssert.assertThat(bulkRequestLatencies.size(), equalTo(3)); + // COUNT + Assert.assertEquals(1.0, bulkRequestLatencies.get(0).getValue(), 0); + testRecords = Collections.singletonList(jsonStringToRecord(generateCustomRecordJson3(testIdField, testId, "name", "value3", "newKey", "newValue"))); + aList = new ArrayList<>(); + aMap = new HashMap<>(); + aMap.put("type", BulkAction.UPSERT.toString()); + aList.add(aMap); + pluginSetting.getSettings().put(IndexConfiguration.ACTIONS, aList); + sink = createObjectUnderTest(pluginSetting, true); + sink.output(testRecords); + retSources = getSearchResponseDocSources(testIndexAlias); + + MatcherAssert.assertThat(retSources.size(), equalTo(1)); + Map source = retSources.get(0); + MatcherAssert.assertThat((String)source.get("name"), equalTo("value3")); + MatcherAssert.assertThat((String)source.get("newKey"), equalTo("newValue")); + MatcherAssert.assertThat(getDocumentCount(testIndexAlias, "_id", testId), equalTo(Integer.valueOf(1))); + sink.shutdown(); + } + + @Test + public void testBulkActionUpsertWithoutCreate() throws IOException, InterruptedException { + final String testIndexAlias = "test-alias-upd2"; + final String testTemplateFile = Objects.requireNonNull( + getClass().getClassLoader().getResource(TEST_TEMPLATE_BULK_FILE)).getFile(); + + final String testIdField = "someId"; + final String testId = "foo"; + List> testRecords = Collections.singletonList(jsonStringToRecord(generateCustomRecordJson3(testIdField, testId, "name", "value1", "newKey", "newValue"))); + final PluginSetting pluginSetting = generatePluginSetting(null, testIndexAlias, testTemplateFile); + pluginSetting.getSettings().put(IndexConfiguration.DOCUMENT_ID_FIELD, testIdField); + List> aList = new ArrayList<>(); + Map aMap = new HashMap<>(); + aMap.put("type", BulkAction.UPSERT.toString()); + aList.add(aMap); + pluginSetting.getSettings().put(IndexConfiguration.ACTIONS, aList); + OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); + sink.output(testRecords); + List> retSources = getSearchResponseDocSources(testIndexAlias); + + MatcherAssert.assertThat(retSources.size(), equalTo(1)); + Map source = retSources.get(0); + MatcherAssert.assertThat((String)source.get("name"), equalTo("value1")); + MatcherAssert.assertThat((String)source.get("newKey"), equalTo("newValue")); + MatcherAssert.assertThat(getDocumentCount(testIndexAlias, "_id", testId), equalTo(Integer.valueOf(1))); + sink.shutdown(); + // verify metrics + final List bulkRequestLatencies = MetricsTestUtil.getMeasurementList( + new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) + .add(OpenSearchSink.BULKREQUEST_LATENCY).toString()); + MatcherAssert.assertThat(bulkRequestLatencies.size(), equalTo(3)); + // COUNT + Assert.assertEquals(1.0, bulkRequestLatencies.get(0).getValue(), 0); + } + + @Test + public void testBulkActionDeleteWithActions() throws IOException, InterruptedException { + final String testIndexAlias = "test-alias-upd1"; + final String testTemplateFile = Objects.requireNonNull( + getClass().getClassLoader().getResource(TEST_TEMPLATE_BULK_FILE)).getFile(); + + final String testIdField = "someId"; + final String testId = "foo"; + List> testRecords = Collections.singletonList(jsonStringToRecord(generateCustomRecordJson(testIdField, testId))); + + final PluginSetting pluginSetting = generatePluginSetting(null, testIndexAlias, testTemplateFile); + pluginSetting.getSettings().put(IndexConfiguration.DOCUMENT_ID_FIELD, testIdField); + List> aList = new ArrayList<>(); + Map aMap = new HashMap<>(); + aMap.put("type", BulkAction.DELETE.toString()); + aList.add(aMap); + pluginSetting.getSettings().put(IndexConfiguration.ACTIONS, aList); + OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); + sink.output(testRecords); + List> retSources = getSearchResponseDocSources(testIndexAlias); + MatcherAssert.assertThat(retSources.size(), equalTo(0)); + sink.shutdown(); + } + @Test @DisabledIf(value = "isES6", disabledReason = TRACE_INGESTION_TEST_DISABLED_REASON) public void testEventOutputWithTags() throws IOException, InterruptedException { @@ -961,6 +1204,27 @@ private String generateCustomRecordJson(final String idField, final String docum ); } + private String generateCustomRecordJson2(final String idField, final String documentId, final String key, final String value) throws IOException { + return Strings.toString( + XContentFactory.jsonBuilder() + .startObject() + .field(idField, documentId) + .field(key, value) + .endObject() + ); + } + + private String generateCustomRecordJson3(final String idField, final String documentId, final String key1, final String value1, final String key2, final String value2) throws IOException { + return Strings.toString( + XContentFactory.jsonBuilder() + .startObject() + .field(idField, documentId) + .field(key1, value1) + .field(key2, value2) + .endObject() + ); + } + private String readDocFromFile(final String filename) throws IOException { final StringBuilder jsonBuilder = new StringBuilder(); try (final InputStream inputStream = Objects.requireNonNull( @@ -1089,6 +1353,7 @@ private void wipeAllOpenSearchIndices() throws IOException { .filter(Predicate.not(indexName -> indexName.startsWith(".opendistro_"))) .filter(Predicate.not(indexName -> indexName.startsWith(".opensearch-"))) .filter(Predicate.not(indexName -> indexName.startsWith(".opensearch_"))) + .filter(Predicate.not(indexName -> indexName.startsWith(".plugins-ml-config"))) .forEach(indexName -> { try { client.performRequest(new Request("DELETE", "/" + indexName)); diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkOperationWrapper.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkOperationWrapper.java index efdd62e07e..f4c1ebb0b9 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkOperationWrapper.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkOperationWrapper.java @@ -7,6 +7,7 @@ import org.opensearch.client.opensearch.core.bulk.BulkOperation; import org.opensearch.dataprepper.model.event.EventHandle; +import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.SerializedJson; import java.util.List; import java.util.Map; @@ -19,6 +20,8 @@ public class BulkOperationWrapper { private static final Predicate IS_INDEX_OPERATION = BulkOperation::isIndex; private static final Predicate IS_CREATE_OPERATION = BulkOperation::isCreate; + private static final Predicate IS_UPDATE_OPERATION = BulkOperation::isUpdate; + private static final Predicate IS_DELETE_OPERATION = BulkOperation::isDelete; private static final Map, Function> BULK_OPERATION_TO_DOCUMENT_CONVERTERS = Map.of( IS_INDEX_OPERATION, operation -> operation.index().document(), @@ -27,26 +30,40 @@ public class BulkOperationWrapper { private static final Map, Function> BULK_OPERATION_TO_INDEX_NAME_CONVERTERS = Map.of( IS_INDEX_OPERATION, operation -> operation.index().index(), - IS_CREATE_OPERATION, operation -> operation.create().index() + IS_CREATE_OPERATION, operation -> operation.create().index(), + IS_UPDATE_OPERATION, operation -> operation.update().index(), + IS_DELETE_OPERATION, operation -> operation.delete().index() ); private static final Map, Function> BULK_OPERATION_TO_ID_CONVERTERS = Map.of( IS_INDEX_OPERATION, operation -> operation.index().id(), - IS_CREATE_OPERATION, operation -> operation.create().id() + IS_CREATE_OPERATION, operation -> operation.create().id(), + IS_UPDATE_OPERATION, operation -> operation.update().id(), + IS_DELETE_OPERATION, operation -> operation.delete().id() ); private final EventHandle eventHandle; private final BulkOperation bulkOperation; + private final SerializedJson jsonNode; public BulkOperationWrapper(final BulkOperation bulkOperation) { this.bulkOperation = bulkOperation; this.eventHandle = null; + this.jsonNode = null; + } + + public BulkOperationWrapper(final BulkOperation bulkOperation, final EventHandle eventHandle, final SerializedJson jsonNode) { + checkNotNull(bulkOperation); + this.bulkOperation = bulkOperation; + this.eventHandle = eventHandle; + this.jsonNode = jsonNode; } public BulkOperationWrapper(final BulkOperation bulkOperation, final EventHandle eventHandle) { checkNotNull(bulkOperation); this.bulkOperation = bulkOperation; this.eventHandle = eventHandle; + this.jsonNode = null; } public BulkOperation getBulkOperation() { @@ -64,6 +81,9 @@ public void releaseEventHandle(boolean result) { } public Object getDocument() { + if (bulkOperation.isUpdate() || bulkOperation.isDelete()) { + return jsonNode; + } return getValueFromConverter(BULK_OPERATION_TO_DOCUMENT_CONVERTERS); } diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java index 45a1638c16..097020382c 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java @@ -6,6 +6,7 @@ package org.opensearch.dataprepper.plugins.sink.opensearch; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.JsonNode; import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.DistributionSummary; import io.micrometer.core.instrument.Timer; @@ -15,6 +16,8 @@ import org.opensearch.client.opensearch.core.BulkRequest; import org.opensearch.client.opensearch.core.bulk.BulkOperation; import org.opensearch.client.opensearch.core.bulk.CreateOperation; +import org.opensearch.client.opensearch.core.bulk.UpdateOperation; +import org.opensearch.client.opensearch.core.bulk.DeleteOperation; import org.opensearch.client.opensearch.core.bulk.IndexOperation; import org.opensearch.client.transport.TransportOptions; import org.opensearch.common.unit.ByteSizeUnit; @@ -67,6 +70,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.StringJoiner; @@ -79,6 +83,7 @@ public class OpenSearchSink extends AbstractSink> { public static final String BULKREQUEST_LATENCY = "bulkRequestLatency"; public static final String BULKREQUEST_ERRORS = "bulkRequestErrors"; + public static final String INVALID_ACTION_ERRORS = "invalidActionErrors"; public static final String BULKREQUEST_SIZE_BYTES = "bulkRequestSizeBytes"; public static final String DYNAMIC_INDEX_DROPPED_EVENTS = "dynamicIndexDroppedEvents"; @@ -102,12 +107,14 @@ public class OpenSearchSink extends AbstractSink> { private final String documentId; private final String routingField; private final String action; + private final List> actions; private final String documentRootKey; private String configuredIndexAlias; private final ReentrantLock lock; private final Timer bulkRequestTimer; private final Counter bulkRequestErrorsCounter; + private final Counter invalidActionErrorsCounter; private final Counter dynamicIndexDroppedEvents; private final DistributionSummary bulkRequestSizeBytesSummary; private OpenSearchClient openSearchClient; @@ -135,10 +142,11 @@ public OpenSearchSink(final PluginSetting pluginSetting, this.expressionEvaluator = expressionEvaluator; bulkRequestTimer = pluginMetrics.timer(BULKREQUEST_LATENCY); bulkRequestErrorsCounter = pluginMetrics.counter(BULKREQUEST_ERRORS); + invalidActionErrorsCounter = pluginMetrics.counter(INVALID_ACTION_ERRORS); dynamicIndexDroppedEvents = pluginMetrics.counter(DYNAMIC_INDEX_DROPPED_EVENTS); bulkRequestSizeBytesSummary = pluginMetrics.summary(BULKREQUEST_SIZE_BYTES); - this.openSearchSinkConfig = OpenSearchSinkConfiguration.readESConfig(pluginSetting); + this.openSearchSinkConfig = OpenSearchSinkConfiguration.readESConfig(pluginSetting, expressionEvaluator); this.bulkSize = ByteSizeUnit.MB.toBytes(openSearchSinkConfig.getIndexConfiguration().getBulkSize()); this.flushTimeout = openSearchSinkConfig.getIndexConfiguration().getFlushTimeout(); this.indexType = openSearchSinkConfig.getIndexConfiguration().getIndexType(); @@ -146,6 +154,7 @@ public OpenSearchSink(final PluginSetting pluginSetting, this.documentId = openSearchSinkConfig.getIndexConfiguration().getDocumentId(); this.routingField = openSearchSinkConfig.getIndexConfiguration().getRoutingField(); this.action = openSearchSinkConfig.getIndexConfiguration().getAction(); + this.actions = openSearchSinkConfig.getIndexConfiguration().getActions(); this.documentRootKey = openSearchSinkConfig.getIndexConfiguration().getDocumentRootKey(); this.indexManagerFactory = new IndexManagerFactory(new ClusterSettingsParser()); this.failedBulkOperationConverter = new FailedBulkOperationConverter(pluginSetting.getPipelineName(), pluginSetting.getName(), @@ -239,11 +248,70 @@ private void doInitializeInternal() throws IOException { LOG.info("Initialized OpenSearch sink"); } + double getInvalidActionErrorsCount() { + return invalidActionErrorsCounter.count(); + } + @Override public boolean isReady() { return initialized; } + private BulkOperation getBulkOperationForAction(final String action, final SerializedJson document, final String indexName, final JsonNode jsonNode) { + BulkOperation bulkOperation; + final Optional docId = document.getDocumentId(); + final Optional routing = document.getRoutingField(); + + if (StringUtils.equals(action, BulkAction.CREATE.toString())) { + final CreateOperation.Builder createOperationBuilder = + new CreateOperation.Builder<>() + .index(indexName) + .document(document); + docId.ifPresent(createOperationBuilder::id); + routing.ifPresent(createOperationBuilder::routing); + bulkOperation = new BulkOperation.Builder() + .create(createOperationBuilder.build()) + .build(); + return bulkOperation; + } + if (StringUtils.equals(action, BulkAction.UPDATE.toString()) || + StringUtils.equals(action, BulkAction.UPSERT.toString())) { + final UpdateOperation.Builder updateOperationBuilder = (action.toLowerCase() == BulkAction.UPSERT.toString()) ? + new UpdateOperation.Builder<>() + .index(indexName) + .document(jsonNode) + .upsert(jsonNode) : + new UpdateOperation.Builder<>() + .index(indexName) + .document(jsonNode); + docId.ifPresent(updateOperationBuilder::id); + routing.ifPresent(updateOperationBuilder::routing); + bulkOperation = new BulkOperation.Builder() + .update(updateOperationBuilder.build()) + .build(); + return bulkOperation; + } + if (StringUtils.equals(action, BulkAction.DELETE.toString())) { + final DeleteOperation.Builder deleteOperationBuilder = + new DeleteOperation.Builder().index(indexName); + docId.ifPresent(deleteOperationBuilder::id); + routing.ifPresent(deleteOperationBuilder::routing); + bulkOperation = new BulkOperation.Builder() + .delete(deleteOperationBuilder.build()) + .build(); + return bulkOperation; + } + // Default to "index" + final IndexOperation.Builder indexOperationBuilder = + new IndexOperation.Builder<>().index(indexName).document(document); + docId.ifPresent(indexOperationBuilder::id); + routing.ifPresent(indexOperationBuilder::routing); + bulkOperation = new BulkOperation.Builder() + .index(indexOperationBuilder.build()) + .build(); + return bulkOperation; + } + @Override public void doOutput(final Collection> records) { final long threadId = Thread.currentThread().getId(); @@ -260,8 +328,6 @@ public void doOutput(final Collection> records) { for (final Record record : records) { final Event event = record.getData(); final SerializedJson document = getDocument(event); - final Optional docId = document.getDocumentId(); - final Optional routing = document.getRoutingField(); String indexName = configuredIndexAlias; try { indexName = indexManager.getIndexName(event.formatString(indexName, expressionEvaluator)); @@ -278,39 +344,35 @@ public void doOutput(final Collection> records) { continue; } - BulkOperation bulkOperation; - - if (StringUtils.equalsIgnoreCase(action, BulkAction.CREATE.toString())) { - - final CreateOperation.Builder createOperationBuilder = new CreateOperation.Builder<>() - .index(indexName) - .document(document); - - docId.ifPresent(createOperationBuilder::id); - routing.ifPresent(createOperationBuilder::routing); - - bulkOperation = new BulkOperation.Builder() - .create(createOperationBuilder.build()) - .build(); - - } else { - - // Default to "index" - - final IndexOperation.Builder indexOperationBuilder = new IndexOperation.Builder<>() - .index(indexName) - .document(document); - - docId.ifPresent(indexOperationBuilder::id); - routing.ifPresent(indexOperationBuilder::routing); - - bulkOperation = new BulkOperation.Builder() - .index(indexOperationBuilder.build()) - .build(); + String eventAction = action; + if (actions != null) { + for (final Map actionEntry: actions) { + final String condition = (String)actionEntry.get("when"); + eventAction = (String)actionEntry.get("type"); + if (condition != null && + expressionEvaluator.evaluateConditional(condition, event)) { + break; + } + } + } + if (eventAction.contains("${")) { + eventAction = event.formatString(eventAction, expressionEvaluator); + } + if (BulkAction.fromOptionValue(eventAction) == null) { + LOG.error("Unknown action {}, skipping the event", eventAction); + invalidActionErrorsCounter.increment(); + continue; + } + SerializedJson serializedJsonNode = null; + if (StringUtils.equals(action, BulkAction.UPDATE.toString()) || + StringUtils.equals(action, BulkAction.UPSERT.toString()) || + StringUtils.equals(action, BulkAction.DELETE.toString())) { + serializedJsonNode = SerializedJson.fromJsonNode(event.getJsonNode(), document); } + BulkOperation bulkOperation = getBulkOperationForAction(eventAction, document, indexName, event.getJsonNode()); - BulkOperationWrapper bulkOperationWrapper = new BulkOperationWrapper(bulkOperation, event.getEventHandle()); + BulkOperationWrapper bulkOperationWrapper = new BulkOperationWrapper(bulkOperation, event.getEventHandle(), serializedJsonNode); final long estimatedBytesBeforeAdd = bulkRequest.estimateSizeInBytesWithDocument(bulkOperationWrapper); if (bulkSize >= 0 && estimatedBytesBeforeAdd >= bulkSize && bulkRequest.getOperationsCount() > 0) { flushBatch(bulkRequest); diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkConfiguration.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkConfiguration.java index 7c3c019cd5..4ec8f17bc8 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkConfiguration.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkConfiguration.java @@ -7,6 +7,7 @@ import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexConfiguration; +import org.opensearch.dataprepper.expression.ExpressionEvaluator; import static com.google.common.base.Preconditions.checkNotNull; @@ -40,9 +41,13 @@ private OpenSearchSinkConfiguration( } public static OpenSearchSinkConfiguration readESConfig(final PluginSetting pluginSetting) { + return readESConfig(pluginSetting, null); + } + + public static OpenSearchSinkConfiguration readESConfig(final PluginSetting pluginSetting, final ExpressionEvaluator expressionEvaluator) { final ConnectionConfiguration connectionConfiguration = ConnectionConfiguration.readConnectionConfiguration(pluginSetting); - final IndexConfiguration indexConfiguration = IndexConfiguration.readIndexConfig(pluginSetting); + final IndexConfiguration indexConfiguration = IndexConfiguration.readIndexConfig(pluginSetting, expressionEvaluator); final RetryConfiguration retryConfiguration = RetryConfiguration.readRetryConfig(pluginSetting); return new OpenSearchSinkConfiguration(connectionConfiguration, indexConfiguration, retryConfiguration); diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/BulkAction.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/BulkAction.java index bdebc3c18e..52823fee16 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/BulkAction.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/BulkAction.java @@ -14,6 +14,9 @@ public enum BulkAction { CREATE("create"), + UPSERT("upsert"), + UPDATE("update"), + DELETE("delete"), INDEX("index"); private static final Map ACTIONS_MAP = Arrays.stream(BulkAction.values()) @@ -34,7 +37,7 @@ public String toString() { } @JsonCreator - static BulkAction fromOptionValue(final String option) { + public static BulkAction fromOptionValue(final String option) { return ACTIONS_MAP.get(option.toLowerCase()); } diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/SerializedJson.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/SerializedJson.java index 0568247eb1..671a4d0423 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/SerializedJson.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/SerializedJson.java @@ -5,6 +5,7 @@ package org.opensearch.dataprepper.plugins.sink.opensearch.bulk; +import com.fasterxml.jackson.databind.JsonNode; import java.nio.charset.StandardCharsets; import java.util.Objects; import java.util.Optional; @@ -30,5 +31,8 @@ static SerializedJson fromStringAndOptionals(String jsonString, String docId, St return new SerializedJsonImpl(jsonString.getBytes(StandardCharsets.UTF_8), docId, routingField); } + static SerializedJson fromJsonNode(final JsonNode jsonNode, SerializedJson document) { + return new SerializedJsonNode(jsonNode, document); + } } diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/SerializedJsonNode.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/SerializedJsonNode.java new file mode 100644 index 0000000000..181380816a --- /dev/null +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/SerializedJsonNode.java @@ -0,0 +1,49 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.opensearch.bulk; + +import com.fasterxml.jackson.databind.JsonNode; +import java.io.Serializable; +import java.util.Optional; + +class SerializedJsonNode implements SerializedJson, Serializable { + private byte[] document; + private JsonNode jsonNode; + private String documentId = null; + private String routingField = null; + + public SerializedJsonNode(final JsonNode jsonNode, SerializedJson doc) { + this.jsonNode = jsonNode; + this.documentId = doc.getDocumentId().get(); + this.routingField = doc.getRoutingField().get(); + this.document = jsonNode.toString().getBytes(); + } + + public SerializedJsonNode(final JsonNode jsonNode) { + this.jsonNode = jsonNode; + this.document = jsonNode.toString().getBytes(); + } + + @Override + public long getDocumentSize() { + return document.length; + } + + @Override + public byte[] getSerializedJson() { + return document; + } + + @Override + public Optional getDocumentId() { + return Optional.ofNullable(documentId); + } + + @Override + public Optional getRoutingField() { + return Optional.ofNullable(routingField); + } +} diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfiguration.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfiguration.java index d346c9f61c..80bc3e3f06 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfiguration.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfiguration.java @@ -6,6 +6,7 @@ package org.opensearch.dataprepper.plugins.sink.opensearch.index; import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.event.JacksonEvent; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.commons.lang3.EnumUtils; @@ -15,6 +16,7 @@ import org.opensearch.dataprepper.plugins.sink.opensearch.s3.FileReader; import org.opensearch.dataprepper.plugins.sink.opensearch.s3.S3ClientProvider; import org.opensearch.dataprepper.plugins.sink.opensearch.s3.S3FileReader; +import org.opensearch.dataprepper.expression.ExpressionEvaluator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.arns.Arn; @@ -26,6 +28,7 @@ import java.net.URL; import java.util.HashMap; import java.util.Map; +import java.util.List; import java.util.Objects; import java.util.Optional; @@ -56,6 +59,7 @@ public class IndexConfiguration { public static final int DEFAULT_MAX_LOCAL_COMPRESSIONS_FOR_ESTIMATION = 2; public static final long DEFAULT_FLUSH_TIMEOUT = 60_000L; public static final String ACTION = "action"; + public static final String ACTIONS = "actions"; public static final String S3_AWS_REGION = "s3_aws_region"; public static final String S3_AWS_STS_ROLE_ARN = "s3_aws_sts_role_arn"; public static final String S3_AWS_STS_EXTERNAL_ID = "s3_aws_sts_external_id"; @@ -77,6 +81,7 @@ public class IndexConfiguration { private final long flushTimeout; private final Optional ismPolicyFile; private final String action; + private final List> actions; private final String s3AwsRegion; private final String s3AwsStsRoleArn; private final String s3AwsExternalId; @@ -138,6 +143,7 @@ private IndexConfiguration(final Builder builder) { this.documentId = documentId; this.ismPolicyFile = builder.ismPolicyFile; this.action = builder.action; + this.actions = builder.actions; this.documentRootKey = builder.documentRootKey; } @@ -160,6 +166,10 @@ private void determineIndexType(Builder builder) { } public static IndexConfiguration readIndexConfig(final PluginSetting pluginSetting) { + return readIndexConfig(pluginSetting, null); + } + + public static IndexConfiguration readIndexConfig(final PluginSetting pluginSetting, final ExpressionEvaluator expressionEvaluator) { IndexConfiguration.Builder builder = new IndexConfiguration.Builder(); final String indexAlias = pluginSetting.getStringOrDefault(INDEX_ALIAS, null); if (indexAlias != null) { @@ -214,7 +224,13 @@ public static IndexConfiguration readIndexConfig(final PluginSetting pluginSetti final String ismPolicyFile = pluginSetting.getStringOrDefault(ISM_POLICY_FILE, null); builder = builder.withIsmPolicyFile(ismPolicyFile); - builder.withAction(pluginSetting.getStringOrDefault(ACTION, BulkAction.INDEX.toString())); + List> actionsList = pluginSetting.getTypedListOfMaps(ACTIONS, String.class, Object.class); + + if (actionsList != null) { + builder.withActions(actionsList, expressionEvaluator); + } else { + builder.withAction(pluginSetting.getStringOrDefault(ACTION, BulkAction.INDEX.toString()), expressionEvaluator); + } if ((builder.templateFile != null && builder.templateFile.startsWith(S3_PREFIX)) || (builder.ismPolicyFile.isPresent() && builder.ismPolicyFile.get().startsWith(S3_PREFIX))) { @@ -295,6 +311,10 @@ public String getAction() { return action; } + public List> getActions() { + return actions; + } + public String getS3AwsRegion() { return s3AwsRegion; } @@ -381,6 +401,7 @@ public static class Builder { private long flushTimeout = DEFAULT_FLUSH_TIMEOUT; private Optional ismPolicyFile; private String action; + private List> actions; private String s3AwsRegion; private String s3AwsStsRoleArn; private String s3AwsStsExternalId; @@ -468,12 +489,23 @@ public Builder withIsmPolicyFile(final String ismPolicyFile) { return this; } - public Builder withAction(final String action) { - checkArgument(EnumUtils.isValidEnumIgnoreCase(BulkAction.class, action), "action must be one of the following: " + BulkAction.values()); + public Builder withAction(final String action, final ExpressionEvaluator expressionEvaluator) { + checkArgument((EnumUtils.isValidEnumIgnoreCase(BulkAction.class, action) || JacksonEvent.isValidFormatExpressions(action, expressionEvaluator)), "action must be one of the following: " + BulkAction.values()); this.action = action; return this; } + public Builder withActions(final List> actions, final ExpressionEvaluator expressionEvaluator) { + for (final Map actionMap: actions) { + String action = (String)actionMap.get("type"); + if (action != null) { + checkArgument((EnumUtils.isValidEnumIgnoreCase(BulkAction.class, action) || JacksonEvent.isValidFormatExpressions(action, expressionEvaluator)), "action must be one of the following: " + BulkAction.values()); + } + } + this.actions = actions; + return this; + } + public Builder withS3AwsRegion(final String s3AwsRegion) { checkNotNull(s3AwsRegion, "s3AwsRegion cannot be null"); this.s3AwsRegion = s3AwsRegion; diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkOperationWrapperTests.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkOperationWrapperTests.java index 4b03abe6d5..b87a155982 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkOperationWrapperTests.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkOperationWrapperTests.java @@ -5,12 +5,15 @@ package org.opensearch.dataprepper.plugins.sink.opensearch; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.JsonNode; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; import org.opensearch.client.opensearch.core.bulk.BulkOperation; import org.opensearch.client.opensearch.core.bulk.CreateOperation; import org.opensearch.client.opensearch.core.bulk.DeleteOperation; +import org.opensearch.client.opensearch.core.bulk.UpdateOperation; import org.opensearch.client.opensearch.core.bulk.IndexOperation; import org.opensearch.dataprepper.model.event.EventHandle; import org.junit.jupiter.api.Test; @@ -21,7 +24,6 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; -import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.Mockito.mock; public class BulkOperationWrapperTests { @@ -61,12 +63,6 @@ public void testGetId(final BulkOperation bulkOperation) { assertThat(bulkOperationWrapper.getId(), equalTo(ID)); } - @Test - public void testGetIdUnsupportedAction() { - final BulkOperationWrapper bulkOperationWrapper = createObjectUnderTest(null, getDeleteBulkOperation()); - assertThrows(UnsupportedOperationException.class, bulkOperationWrapper::getId); - } - @ParameterizedTest @MethodSource("bulkOperationProvider") public void testGetIndex(final BulkOperation bulkOperation) { @@ -74,12 +70,6 @@ public void testGetIndex(final BulkOperation bulkOperation) { assertThat(bulkOperationWrapper.getIndex(), equalTo(INDEX)); } - @Test - public void testGetIndexUnsupportedAction() { - final BulkOperationWrapper bulkOperationWrapper = createObjectUnderTest(null, getDeleteBulkOperation()); - assertThrows(UnsupportedOperationException.class, bulkOperationWrapper::getIndex); - } - @ParameterizedTest @MethodSource("bulkOperationProvider") public void testGetDocument(final BulkOperation bulkOperation) { @@ -87,12 +77,6 @@ public void testGetDocument(final BulkOperation bulkOperation) { assertThat(bulkOperationWrapper.getDocument(), equalTo(DOCUMENT)); } - @Test - public void testGetDocumentUnsupportedAction() { - final BulkOperationWrapper bulkOperationWrapper = createObjectUnderTest(null, getDeleteBulkOperation()); - assertThrows(UnsupportedOperationException.class, bulkOperationWrapper::getDocument); - } - private static Stream bulkOperationProvider() { final IndexOperation indexOperation = new IndexOperation.Builder<>() .id(ID) @@ -112,10 +96,34 @@ private static Stream bulkOperationProvider() { .create(createOperation) .build(); + ObjectMapper objectMapper = new ObjectMapper(); + JsonNode jsonNode = null; + try { + jsonNode = objectMapper.readTree("{\"key\":\"value\"}"); + } catch (Exception e){} + + final UpdateOperation updateOperation = new UpdateOperation.Builder<>() + .id(ID) + .index(INDEX) + .document(DOCUMENT) + .build(); + final BulkOperation updateBulkOperation = (BulkOperation) new BulkOperation.Builder() + .update(updateOperation) + .build(); + + final DeleteOperation deleteOperation = new DeleteOperation.Builder() + .id(ID) + .index(INDEX) + .build(); + final BulkOperation deleteBulkOperation = (BulkOperation) new BulkOperation.Builder() + .delete(deleteOperation) + .build(); return Stream.of( Arguments.of( indexBulkOperation, - createBulkOperation + createBulkOperation, + updateBulkOperation, + deleteBulkOperation ) ); } diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkConfigurationTests.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkConfigurationTests.java index d13d44d6cb..537d2bffac 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkConfigurationTests.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkConfigurationTests.java @@ -10,12 +10,18 @@ import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.BulkAction; import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexConfiguration; import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexType; +import org.opensearch.dataprepper.expression.ExpressionEvaluator; import java.util.Collections; import java.util.HashMap; import java.util.List; +import java.util.ArrayList; import java.util.Map; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.mockito.ArgumentMatchers.anyString; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -23,6 +29,7 @@ public class OpenSearchSinkConfigurationTests { private final List TEST_HOSTS = Collections.singletonList("http://localhost:9200"); private static final String PLUGIN_NAME = "opensearch"; private static final String PIPELINE_NAME = "integTestPipeline"; + private ExpressionEvaluator expressionEvaluator; @Test public void testReadESConfig() { @@ -49,6 +56,61 @@ public void testInvalidAction() { } + @Test(expected = IllegalArgumentException.class) + public void testInvalidActions() { + + final Map metadata = new HashMap<>(); + metadata.put(IndexConfiguration.INDEX_TYPE, IndexType.TRACE_ANALYTICS_RAW.getValue()); + List> invalidActionList = new ArrayList<>(); + Map actionMap = new HashMap<>(); + actionMap.put("type", "invalid"); + invalidActionList.add(actionMap); + metadata.put(IndexConfiguration.ACTIONS, invalidActionList); + metadata.put(ConnectionConfiguration.HOSTS, TEST_HOSTS); + + final PluginSetting pluginSetting = new PluginSetting(PLUGIN_NAME, metadata); + pluginSetting.setPipelineName(PIPELINE_NAME); + + OpenSearchSinkConfiguration.readESConfig(pluginSetting); + + } + + @Test(expected = IllegalArgumentException.class) + public void testInvalidActionWithExpression() { + + final Map metadata = new HashMap<>(); + metadata.put(IndexConfiguration.INDEX_TYPE, IndexType.TRACE_ANALYTICS_RAW.getValue()); + metadata.put(IndexConfiguration.ACTION, "${anInvalidFunction()}"); + metadata.put(ConnectionConfiguration.HOSTS, TEST_HOSTS); + + final PluginSetting pluginSetting = new PluginSetting(PLUGIN_NAME, metadata); + pluginSetting.setPipelineName(PIPELINE_NAME); + + expressionEvaluator = mock(ExpressionEvaluator.class); + when(expressionEvaluator.isValidExpressionStatement(anyString())).thenReturn(false); + OpenSearchSinkConfiguration.readESConfig(pluginSetting, expressionEvaluator); + } + + @Test(expected = IllegalArgumentException.class) + public void testInvalidActionsWithExpression() { + + final Map metadata = new HashMap<>(); + metadata.put(IndexConfiguration.INDEX_TYPE, IndexType.TRACE_ANALYTICS_RAW.getValue()); + List> invalidActionList = new ArrayList<>(); + Map actionMap = new HashMap<>(); + actionMap.put("type", "${anInvalidFunction()}"); + invalidActionList.add(actionMap); + metadata.put(IndexConfiguration.ACTIONS, invalidActionList); + metadata.put(ConnectionConfiguration.HOSTS, TEST_HOSTS); + + final PluginSetting pluginSetting = new PluginSetting(PLUGIN_NAME, metadata); + pluginSetting.setPipelineName(PIPELINE_NAME); + + expressionEvaluator = mock(ExpressionEvaluator.class); + when(expressionEvaluator.isValidExpressionStatement(anyString())).thenReturn(false); + OpenSearchSinkConfiguration.readESConfig(pluginSetting, expressionEvaluator); + } + @Test public void testReadESConfigWithBulkActionCreate() { @@ -68,6 +130,27 @@ public void testReadESConfigWithBulkActionCreate() { assertNotNull(openSearchSinkConfiguration.getRetryConfiguration()); } + @Test + public void testReadESConfigWithBulkActionCreateExpression() { + + final Map metadata = new HashMap<>(); + metadata.put(IndexConfiguration.INDEX_TYPE, IndexType.TRACE_ANALYTICS_RAW.getValue()); + metadata.put(IndexConfiguration.ACTION, "${getMetadata(\"action\")}"); + metadata.put(ConnectionConfiguration.HOSTS, TEST_HOSTS); + + final PluginSetting pluginSetting = new PluginSetting(PLUGIN_NAME, metadata); + pluginSetting.setPipelineName(PIPELINE_NAME); + + expressionEvaluator = mock(ExpressionEvaluator.class); + when(expressionEvaluator.isValidExpressionStatement(anyString())).thenReturn(true); + final OpenSearchSinkConfiguration openSearchSinkConfiguration = + OpenSearchSinkConfiguration.readESConfig(pluginSetting, expressionEvaluator); + + assertNotNull(openSearchSinkConfiguration.getConnectionConfiguration()); + assertNotNull(openSearchSinkConfiguration.getIndexConfiguration()); + assertNotNull(openSearchSinkConfiguration.getRetryConfiguration()); + } + private PluginSetting generatePluginSetting() { final Map metadata = new HashMap<>(); metadata.put(IndexConfiguration.INDEX_TYPE, IndexType.TRACE_ANALYTICS_RAW.getValue()); diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/SerializedJsonNodeTest.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/SerializedJsonNodeTest.java new file mode 100644 index 0000000000..1131a13b5d --- /dev/null +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/SerializedJsonNodeTest.java @@ -0,0 +1,61 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.opensearch.bulk; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Random; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.JsonNode; +import org.apache.commons.lang3.RandomStringUtils; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; + +class SerializedJsonNodeTest { + private int documentSize; + private byte[] documentBytes; + private String documentId; + private String routingField; + private JsonNode jsonNode; + private SerializedJson document; + private String jsonString; + + @BeforeEach + void setUp() { + Random random = new Random(); + jsonString = "{\"key\":\"value\"}"; + documentSize = jsonString.length(); + + ObjectMapper objectMapper = new ObjectMapper(); + try { + jsonNode = objectMapper.readTree(jsonString); + } catch (Exception e) { + jsonNode = null; + } + documentId = RandomStringUtils.randomAlphabetic(10); + routingField = RandomStringUtils.randomAlphabetic(10); + document = SerializedJson.fromStringAndOptionals(jsonString, documentId, routingField); + } + + private SerializedJsonNode createObjectUnderTest() { + return new SerializedJsonNode(jsonNode, document); + } + + @Test + void getDocumentSize_returns_size_of_the_document_byte_array() { + assertThat(createObjectUnderTest().getDocumentSize(), equalTo((long) documentSize)); + } + + @Test + void getSerializedJson_returns_the_document_byte_array_and_fields() { + assertThat(createObjectUnderTest().getSerializedJson(), equalTo(jsonString.getBytes())); + assertThat(createObjectUnderTest().getDocumentId().get(), equalTo(documentId)); + assertThat(createObjectUnderTest().getRoutingField().get(), equalTo(routingField)); + } +} + diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/SerializedJsonTest.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/SerializedJsonTest.java index 4d4a9221ca..836a1b1c4d 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/SerializedJsonTest.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/SerializedJsonTest.java @@ -7,6 +7,8 @@ import org.junit.jupiter.api.Test; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.JsonNode; import org.apache.commons.lang3.RandomStringUtils; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.MatcherAssert.assertThat; @@ -26,13 +28,32 @@ void fromString_throws_if_the_jsonString_is_null() { @Test void fromString_returns_SerializedJsonImpl_with_correctValues() { - String documentId = RandomStringUtils.randomAlphabetic(10); - String routingField = RandomStringUtils.randomAlphabetic(10); - SerializedJson serializedJson = SerializedJson.fromStringAndOptionals("{}", documentId, routingField); - assertThat(serializedJson, instanceOf(SerializedJsonImpl.class)); + String documentId = RandomStringUtils.randomAlphabetic(10); + String routingField = RandomStringUtils.randomAlphabetic(10); + SerializedJson serializedJson = SerializedJson.fromStringAndOptionals("{}", documentId, routingField); + assertThat(serializedJson, instanceOf(SerializedJsonImpl.class)); assertThat(serializedJson.getDocumentId().get(), equalTo(documentId)); assertThat(serializedJson.getRoutingField().get(), equalTo(routingField)); assertThat(serializedJson.getSerializedJson(), equalTo("{}".getBytes())); } + @Test + void fromString_returns_SerializedJsonNode_with_correctValues() { + String documentId = RandomStringUtils.randomAlphabetic(10); + String routingField = RandomStringUtils.randomAlphabetic(10); + final String jsonString = "{\"key\":\"value\"}"; + JsonNode jsonNode; + ObjectMapper objectMapper = new ObjectMapper(); + try { + jsonNode = objectMapper.readTree(jsonString); + } catch (Exception e) { + jsonNode = null; + } + SerializedJson document = SerializedJson.fromStringAndOptionals(jsonString, documentId, routingField); + SerializedJson serializedJson = SerializedJson.fromJsonNode(jsonNode, document); + assertThat(serializedJson, instanceOf(SerializedJsonNode.class)); + assertThat(serializedJson.getDocumentId().get(), equalTo(documentId)); + assertThat(serializedJson.getRoutingField().get(), equalTo(routingField)); + assertThat(serializedJson.getSerializedJson(), equalTo(jsonString.getBytes())); + } } diff --git a/data-prepper-plugins/opensearch/src/test/resources/test-bulk-template.json b/data-prepper-plugins/opensearch/src/test/resources/test-bulk-template.json new file mode 100644 index 0000000000..a3c50f967c --- /dev/null +++ b/data-prepper-plugins/opensearch/src/test/resources/test-bulk-template.json @@ -0,0 +1,27 @@ +{ + "version": 1, + "mappings": { + "date_detection": false, + "dynamic_templates": [ + { + "strings_as_keyword": { + "mapping": { + "ignore_above": 1024, + "type": "keyword" + }, + "match_mapping_type": "string" + } + } + ], + "_source": { + "enabled": true + }, + "properties": { + "name": { + "ignore_above": 1024, + "type": "keyword" + } + } + } +} +