Skip to content

Commit

Permalink
Set ingest processor supports copying from one field to another
Browse files Browse the repository at this point in the history
Signed-off-by: Gao Binlong <gbinlong@amazon.com>
  • Loading branch information
gaobinlong committed Oct 10, 2023
1 parent 74ffdb6 commit d864f14
Show file tree
Hide file tree
Showing 7 changed files with 273 additions and 28 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add unreferenced file cleanup count to merge stats ([#10204](https://github.com/opensearch-project/OpenSearch/pull/10204))
- [Remote Store] Add support to restrict creation & deletion if system repository and mutation of immutable settings of system repository ([#9839](https://github.com/opensearch-project/OpenSearch/pull/9839))
- Improve compressed request handling ([#10261](https://github.com/opensearch-project/OpenSearch/pull/10261))
- Set ingest processor supports copying from one field to another

### Dependencies
- Bump `peter-evans/create-or-update-comment` from 2 to 3 ([#9575](https://github.com/opensearch-project/OpenSearch/pull/9575))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@

import java.util.Map;

import static org.opensearch.ingest.ConfigurationUtils.newConfigurationException;

/**
* Processor that adds new fields with their corresponding values. If the field is already present, its value
* will be replaced with the provided one.
Expand All @@ -54,9 +56,10 @@ public final class SetProcessor extends AbstractProcessor {
private final TemplateScript.Factory field;
private final ValueSource value;
private final boolean ignoreEmptyValue;
private final String copyFrom;

SetProcessor(String tag, String description, TemplateScript.Factory field, ValueSource value) {
this(tag, description, field, value, true, false);
SetProcessor(String tag, String description, TemplateScript.Factory field, ValueSource value, String copyFrom) {
this(tag, description, field, value, true, false, copyFrom);
}

SetProcessor(
Expand All @@ -65,13 +68,15 @@ public final class SetProcessor extends AbstractProcessor {
TemplateScript.Factory field,
ValueSource value,
boolean overrideEnabled,
boolean ignoreEmptyValue
boolean ignoreEmptyValue,
String copyFrom
) {
super(tag, description);
this.overrideEnabled = overrideEnabled;
this.field = field;
this.value = value;
this.ignoreEmptyValue = ignoreEmptyValue;
this.copyFrom = copyFrom;
}

public boolean isOverrideEnabled() {
Expand All @@ -90,10 +95,27 @@ public boolean isIgnoreEmptyValue() {
return ignoreEmptyValue;
}

public String getCopyFrom() {
return copyFrom;
}

@Override
public IngestDocument execute(IngestDocument document) {
if (overrideEnabled || document.hasField(field) == false || document.getFieldValue(field, Object.class) == null) {
document.setFieldValue(field, value, ignoreEmptyValue);
if (copyFrom != null) {
String path = document.renderTemplate(field);
if (copyFrom.isEmpty()) {
throw new IllegalArgumentException("copy_from cannot be empty");
}
Object sourceFieldValue = document.getFieldValue(copyFrom, Object.class, ignoreEmptyValue);
if (ignoreEmptyValue
&& (sourceFieldValue == null || sourceFieldValue instanceof String && ((String) sourceFieldValue).isEmpty())) {
return document;
}
document.setFieldValue(path, IngestDocument.deepCopy(sourceFieldValue));
} else {
document.setFieldValue(field, value, ignoreEmptyValue);
}
}
return document;
}
Expand All @@ -119,18 +141,20 @@ public SetProcessor create(
Map<String, Object> config
) throws Exception {
String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field");
Object value = ConfigurationUtils.readObject(TYPE, processorTag, config, "value");
boolean overrideEnabled = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "override", true);
TemplateScript.Factory compiledTemplate = ConfigurationUtils.compileTemplate(TYPE, processorTag, "field", field, scriptService);
boolean ignoreEmptyValue = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "ignore_empty_value", false);
return new SetProcessor(
processorTag,
description,
compiledTemplate,
ValueSource.wrap(value, scriptService),
overrideEnabled,
ignoreEmptyValue
);
String copyFrom = ConfigurationUtils.readOptionalStringProperty(TYPE, processorTag, config, "copy_from");

ValueSource valueSource = null;
if (copyFrom == null) {
Object value = ConfigurationUtils.readObject(TYPE, processorTag, config, "value");
valueSource = ValueSource.wrap(value, scriptService);
} else if (config.get("value") != null) {
throw newConfigurationException(TYPE, processorTag, "copy_from", "either copy_from or value can be set");
}

return new SetProcessor(processorTag, description, compiledTemplate, valueSource, overrideEnabled, ignoreEmptyValue, copyFrom);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,8 @@ public void testRestOfTheDocumentIsAvailable() throws Exception {
"_tag",
null,
new TestTemplateService.MockTemplateScript.Factory("_ingest._value.new_field"),
(model) -> model.get("other")
(model) -> model.get("other"),
null
),
false
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,4 +124,16 @@ public void testInvalidMustacheTemplate() throws Exception {
assertThat(exception.getMetadata("opensearch.processor_tag").get(0), equalTo(processorTag));
}

public void testCopyFrom() throws Exception {
Map<String, Object> config = new HashMap<>();
config.put("field", "field1");
config.put("value", "value1");
config.put("copy_from", "field2");
String processorTag = randomAlphaOfLength(10);
assertThrows(
"either copy_from or value can be set",
OpenSearchParseException.class,
() -> factory.create(null, processorTag, null, config)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import org.hamcrest.Matchers;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.hamcrest.Matchers.equalTo;

Expand All @@ -51,7 +53,7 @@ public void testSetExistingFields() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
String fieldName = RandomDocumentPicks.randomExistingFieldName(random(), ingestDocument);
Object fieldValue = RandomDocumentPicks.randomFieldValue(random());
Processor processor = createSetProcessor(fieldName, fieldValue, true, false);
Processor processor = createSetProcessor(fieldName, fieldValue, true, false, null);
processor.execute(ingestDocument);
assertThat(ingestDocument.hasField(fieldName), equalTo(true));
assertThat(ingestDocument.getFieldValue(fieldName, Object.class), equalTo(fieldValue));
Expand All @@ -63,7 +65,7 @@ public void testSetNewFields() throws Exception {
IngestDocument testIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
Object fieldValue = RandomDocumentPicks.randomFieldValue(random());
String fieldName = RandomDocumentPicks.addRandomField(random(), testIngestDocument, fieldValue);
Processor processor = createSetProcessor(fieldName, fieldValue, true, false);
Processor processor = createSetProcessor(fieldName, fieldValue, true, false, null);
processor.execute(ingestDocument);
assertThat(ingestDocument.hasField(fieldName), equalTo(true));
assertThat(ingestDocument.getFieldValue(fieldName, Object.class), equalTo(fieldValue));
Expand All @@ -72,7 +74,7 @@ public void testSetNewFields() throws Exception {
public void testSetFieldsTypeMismatch() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
ingestDocument.setFieldValue("field", "value");
Processor processor = createSetProcessor("field.inner", "value", true, false);
Processor processor = createSetProcessor("field.inner", "value", true, false, null);
try {
processor.execute(ingestDocument);
fail("processor execute should have failed");
Expand All @@ -88,7 +90,7 @@ public void testSetNewFieldWithOverrideDisabled() throws Exception {
IngestDocument ingestDocument = new IngestDocument(new HashMap<>(), new HashMap<>());
String fieldName = RandomDocumentPicks.randomFieldName(random());
Object fieldValue = RandomDocumentPicks.randomFieldValue(random());
Processor processor = createSetProcessor(fieldName, fieldValue, false, false);
Processor processor = createSetProcessor(fieldName, fieldValue, false, false, null);
processor.execute(ingestDocument);
assertThat(ingestDocument.hasField(fieldName), equalTo(true));
assertThat(ingestDocument.getFieldValue(fieldName, Object.class), equalTo(fieldValue));
Expand All @@ -98,7 +100,7 @@ public void testSetExistingFieldWithOverrideDisabled() throws Exception {
IngestDocument ingestDocument = new IngestDocument(new HashMap<>(), new HashMap<>());
Object fieldValue = "foo";
String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, fieldValue);
Processor processor = createSetProcessor(fieldName, "bar", false, false);
Processor processor = createSetProcessor(fieldName, "bar", false, false, null);
processor.execute(ingestDocument);
assertThat(ingestDocument.hasField(fieldName), equalTo(true));
assertThat(ingestDocument.getFieldValue(fieldName, Object.class), equalTo(fieldValue));
Expand All @@ -109,60 +111,110 @@ public void testSetExistingNullFieldWithOverrideDisabled() throws Exception {
Object fieldValue = null;
Object newValue = "bar";
String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, fieldValue);
Processor processor = createSetProcessor(fieldName, newValue, false, false);
Processor processor = createSetProcessor(fieldName, newValue, false, false, null);
processor.execute(ingestDocument);
assertThat(ingestDocument.hasField(fieldName), equalTo(true));
assertThat(ingestDocument.getFieldValue(fieldName, Object.class), equalTo(newValue));
}

public void testSetMetadataExceptVersion() throws Exception {
Metadata randomMetadata = randomFrom(Metadata.INDEX, Metadata.ID, Metadata.ROUTING);
Processor processor = createSetProcessor(randomMetadata.getFieldName(), "_value", true, false);
Processor processor = createSetProcessor(randomMetadata.getFieldName(), "_value", true, false, null);
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
processor.execute(ingestDocument);
assertThat(ingestDocument.getFieldValue(randomMetadata.getFieldName(), String.class), Matchers.equalTo("_value"));
}

public void testSetMetadataVersion() throws Exception {
long version = randomNonNegativeLong();
Processor processor = createSetProcessor(Metadata.VERSION.getFieldName(), version, true, false);
Processor processor = createSetProcessor(Metadata.VERSION.getFieldName(), version, true, false, null);
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
processor.execute(ingestDocument);
assertThat(ingestDocument.getFieldValue(Metadata.VERSION.getFieldName(), Long.class), Matchers.equalTo(version));
}

public void testSetMetadataVersionType() throws Exception {
String versionType = randomFrom("internal", "external", "external_gte");
Processor processor = createSetProcessor(Metadata.VERSION_TYPE.getFieldName(), versionType, true, false);
Processor processor = createSetProcessor(Metadata.VERSION_TYPE.getFieldName(), versionType, true, false, null);
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
processor.execute(ingestDocument);
assertThat(ingestDocument.getFieldValue(Metadata.VERSION_TYPE.getFieldName(), String.class), Matchers.equalTo(versionType));
}

public void testSetMetadataIfSeqNo() throws Exception {
long ifSeqNo = randomNonNegativeLong();
Processor processor = createSetProcessor(Metadata.IF_SEQ_NO.getFieldName(), ifSeqNo, true, false);
Processor processor = createSetProcessor(Metadata.IF_SEQ_NO.getFieldName(), ifSeqNo, true, false, null);
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
processor.execute(ingestDocument);
assertThat(ingestDocument.getFieldValue(Metadata.IF_SEQ_NO.getFieldName(), Long.class), Matchers.equalTo(ifSeqNo));
}

public void testSetMetadataIfPrimaryTerm() throws Exception {
long ifPrimaryTerm = randomNonNegativeLong();
Processor processor = createSetProcessor(Metadata.IF_PRIMARY_TERM.getFieldName(), ifPrimaryTerm, true, false);
Processor processor = createSetProcessor(Metadata.IF_PRIMARY_TERM.getFieldName(), ifPrimaryTerm, true, false, null);
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
processor.execute(ingestDocument);
assertThat(ingestDocument.getFieldValue(Metadata.IF_PRIMARY_TERM.getFieldName(), Long.class), Matchers.equalTo(ifPrimaryTerm));
}

private static Processor createSetProcessor(String fieldName, Object fieldValue, boolean overrideEnabled, boolean ignoreEmptyValue) {
public void testCopyFromWithIgnoreEmptyValue() throws Exception {
// do nothing if copy_from field does not exist
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
String newFieldName = RandomDocumentPicks.randomFieldName(random());
Processor processor = createSetProcessor(newFieldName, null, true, true, RandomDocumentPicks.randomFieldName(random()));
processor.execute(ingestDocument);
assertFalse(ingestDocument.hasField(newFieldName));

// throw illegalArgumentException if copy_from is empty string
Processor processorWithEmptyCopyFrom = createSetProcessor(newFieldName, null, true, true, "");
assertThrows("copy_from cannot be empty", IllegalArgumentException.class, () -> processorWithEmptyCopyFrom.execute(ingestDocument));
}

public void testCopyFromOtherField() throws Exception {
// can copy different types of data from one field to another
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
Object existingFieldValue = RandomDocumentPicks.randomFieldValue(random());
String existingFieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, existingFieldValue);
String newFieldName = RandomDocumentPicks.randomFieldName(random());
Processor processor = createSetProcessor(newFieldName, null, true, false, existingFieldName);
processor.execute(ingestDocument);
assertTrue(ingestDocument.hasField(newFieldName));
assertDeepCopiedObjectEquals(ingestDocument.getFieldValue(newFieldName, Object.class), existingFieldValue);
}

@SuppressWarnings("unchecked")
private static void assertDeepCopiedObjectEquals(Object expected, Object actual) {
if (expected instanceof Map) {
Map<String, Object> expectedMap = (Map<String, Object>) expected;
Map<String, Object> actualMap = (Map<String, Object>) actual;
assertEquals(expectedMap.size(), actualMap.size());
for (Map.Entry<String, Object> expectedEntry : expectedMap.entrySet()) {
assertDeepCopiedObjectEquals(expectedEntry.getValue(), actualMap.get(expectedEntry.getKey()));
}
} else if (expected instanceof List) {
assertArrayEquals(((List<?>) expected).toArray(), ((List<?>) actual).toArray());
} else if (expected instanceof byte[]) {
assertArrayEquals((byte[]) expected, (byte[]) actual);
} else {
assertEquals(expected, actual);
}
}

private static Processor createSetProcessor(
String fieldName,
Object fieldValue,
boolean overrideEnabled,
boolean ignoreEmptyValue,
String copyFrom
) {
return new SetProcessor(
randomAlphaOfLength(10),
null,
new TestTemplateService.MockTemplateScript.Factory(fieldName),
ValueSource.wrap(fieldValue, TestTemplateService.instance()),
overrideEnabled,
ignoreEmptyValue
ignoreEmptyValue,
copyFrom
);
}
}
Loading

0 comments on commit d864f14

Please sign in to comment.