Skip to content

Commit

Permalink
Address PR comments
Browse files Browse the repository at this point in the history
Signed-off-by: Taylor Gray <tylgry@amazon.com>
  • Loading branch information
graytaylor0 committed Nov 6, 2023
1 parent 1f57208 commit bcd150a
Show file tree
Hide file tree
Showing 8 changed files with 42 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -387,12 +387,21 @@ public Map<String, Object> toMap() {
return mapper.convertValue(jsonNode, MAP_TYPE_REFERENCE);
}


public static boolean isValidEventKey(final String key) {
try {
checkKey(key);
return true;
} catch (final Exception e) {
return false;
}
}
private String checkAndTrimKey(final String key) {
checkKey(key);
return trimKey(key);
}

private void checkKey(final String key) {
private static void checkKey(final String key) {
checkNotNull(key, "key cannot be null");
checkArgument(!key.isEmpty(), "key cannot be an empty string");
if (key.length() > MAX_KEY_LENGTH) {
Expand All @@ -409,7 +418,7 @@ private String trimKey(final String key) {
return trimmedLeadingSlash.endsWith(SEPARATOR) ? trimmedLeadingSlash.substring(0, trimmedLeadingSlash.length() - 2) : trimmedLeadingSlash;
}

private boolean isValidKey(final String key) {
private static boolean isValidKey(final String key) {
for (int i = 0; i < key.length(); i++) {
char c = key.charAt(i);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,11 @@ void testJsonStringBuilderWithExcludeKeys() {

}

@ParameterizedTest
@CsvSource(value = {"test_key, true", "/test_key, true", "inv(alid, false", "getMetadata(\"test_key\"), false"})
void isValidEventKey_returns_expected_result(final String key, final boolean isValid) {
assertThat(JacksonEvent.isValidEventKey(key), equalTo(isValid));
}

private static Map<String, Object> createComplexDataMap() {
final Map<String, Object> dataObject = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.antlr.v4.runtime.tree.ParseTree;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;

import javax.inject.Inject;
import javax.inject.Named;
Expand Down Expand Up @@ -65,10 +66,8 @@ public Boolean isValidFormatExpressions(final String format) {
String name = format.substring(position + 2, endPosition);

Object val;
// We only check the expression if it matches (.*) to mitigate the issue with the antlr logger ()
// These can't be keys because (, ) is invalid for keys, so we attempt to validate the expression. All expressions currently
// contain a function ( ) so this will detect them to validate against.
if (name.matches("(.*)") && !isValidExpressionStatement(name)) {
// Invalid if it is not a valid key and not a valid expression statement
if (!JacksonEvent.isValidEventKey(name) && !isValidExpressionStatement(name)) {
return false;
}
fromIndex = endPosition + 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -646,9 +646,11 @@ public void testOpenSearchBulkActionsCreateWithExpression() throws IOException,
pluginSetting.getSettings().put(IndexConfiguration.DOCUMENT_ID_FIELD, testIdField);
Event event = (Event)testRecords.get(0).getData();
event.getMetadata().setAttribute("action", "create");
final String actionFormatExpression = "${getMetadata(\"action\")}";
when(expressionEvaluator.isValidFormatExpressions(actionFormatExpression)).thenReturn(true);
when(expressionEvaluator.isValidExpressionStatement("getMetadata(\"action\")")).thenReturn(true);
when(expressionEvaluator.evaluate("getMetadata(\"action\")", event)).thenReturn(event.getMetadata().getAttribute("action"));
pluginSetting.getSettings().put(IndexConfiguration.ACTION, "${getMetadata(\"action\")}");
pluginSetting.getSettings().put(IndexConfiguration.ACTION, actionFormatExpression);
final OpenSearchSink sink = createObjectUnderTest(pluginSetting, true);
sink.output(testRecords);
final List<Map<String, Object>> retSources = getSearchResponseDocSources(testIndexAlias);
Expand Down Expand Up @@ -677,9 +679,11 @@ public void testOpenSearchBulkActionsCreateWithInvalidExpression() throws IOExce
pluginSetting.getSettings().put(IndexConfiguration.DOCUMENT_ID_FIELD, testIdField);
Event event = (Event)testRecords.get(0).getData();
event.getMetadata().setAttribute("action", "unknown");
final String actionFormatExpression = "${getMetadata(\"action\")}";
when(expressionEvaluator.isValidFormatExpressions(actionFormatExpression)).thenReturn(true);
when(expressionEvaluator.isValidExpressionStatement("getMetadata(\"action\")")).thenReturn(true);
when(expressionEvaluator.evaluate("getMetadata(\"action\")", event)).thenReturn(event.getMetadata().getAttribute("action"));
pluginSetting.getSettings().put(IndexConfiguration.ACTION, "${getMetadata(\"action\")}");
pluginSetting.getSettings().put(IndexConfiguration.ACTION, actionFormatExpression);
final OpenSearchSink sink = createObjectUnderTest(pluginSetting, true);
sink.output(testRecords);
final List<Map<String, Object>> retSources = getSearchResponseDocSources(testIndexAlias);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,10 @@ private AccumulatingBulkRequest<BulkOperationWrapper, BulkRequest> createBulkReq
if (bulkItemResponse.error() != null) {
if (!NON_RETRY_STATUS.contains(bulkItemResponse.status())) {
requestToReissue.addOperation(bulkOperation);
} else if (VERSION_CONFLICT_EXCEPTION_TYPE.equals(bulkItemResponse.error().type())) {
documentsVersionConflictErrors.increment();
LOG.debug("Received version conflict from OpenSearch: {}", bulkItemResponse.error().reason());
bulkOperation.releaseEventHandle(true);
} else {
nonRetryableFailures.add(FailedBulkOperation.builder()
.withBulkOperation(bulkOperation)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public class OpenSearchSink extends AbstractSink<Record<Event>> {
public static final String INVALID_ACTION_ERRORS = "invalidActionErrors";
public static final String BULKREQUEST_SIZE_BYTES = "bulkRequestSizeBytes";
public static final String DYNAMIC_INDEX_DROPPED_EVENTS = "dynamicIndexDroppedEvents";
public static final String INVALID_VERSION_EXPRESSION_DROPPED_EVENTS = "invalidVersionExpressionDroppedEvents";
public static final String INVALID_VERSION_EXPRESSION_DROPPED_EVENTS = "dynamicDocumentVersionDroppedEvents";

private static final Logger LOG = LoggerFactory.getLogger(OpenSearchSink.class);
private static final int INITIALIZE_RETRY_WAIT_TIME_MS = 5000;
Expand Down Expand Up @@ -122,7 +122,7 @@ public class OpenSearchSink extends AbstractSink<Record<Event>> {
private final Counter invalidActionErrorsCounter;
private final Counter dynamicIndexDroppedEvents;
private final DistributionSummary bulkRequestSizeBytesSummary;
private final Counter versionExpressionDroppedEventsCounter;
private final Counter dynamicDocumentVersionDroppedEvents;
private OpenSearchClient openSearchClient;
private ObjectMapper objectMapper;
private volatile boolean initialized;
Expand Down Expand Up @@ -151,7 +151,7 @@ public OpenSearchSink(final PluginSetting pluginSetting,
invalidActionErrorsCounter = pluginMetrics.counter(INVALID_ACTION_ERRORS);
dynamicIndexDroppedEvents = pluginMetrics.counter(DYNAMIC_INDEX_DROPPED_EVENTS);
bulkRequestSizeBytesSummary = pluginMetrics.summary(BULKREQUEST_SIZE_BYTES);
versionExpressionDroppedEventsCounter = pluginMetrics.counter(INVALID_VERSION_EXPRESSION_DROPPED_EVENTS);
dynamicDocumentVersionDroppedEvents = pluginMetrics.counter(INVALID_VERSION_EXPRESSION_DROPPED_EVENTS);

this.openSearchSinkConfig = OpenSearchSinkConfiguration.readESConfig(pluginSetting, expressionEvaluator);
this.bulkSize = ByteSizeUnit.MB.toBytes(openSearchSinkConfig.getIndexConfiguration().getBulkSize());
Expand Down Expand Up @@ -371,13 +371,13 @@ public void doOutput(final Collection<Record<Event>> records) {
versionExpressionEvaluationResult = event.formatString(versionExpression, expressionEvaluator);
version = Long.valueOf(event.formatString(versionExpression, expressionEvaluator));
} catch (final NumberFormatException e) {
LOG.warn("Unable to convert the result of evaluating version_expression {} to Long for an Event. {} must be a valid Long type", versionExpression, versionExpressionEvaluationResult);
LOG.warn("Unable to convert the result of evaluating document_version '{}' to Long for an Event. The evaluation result '{}' must be a valid Long type", versionExpression, versionExpressionEvaluationResult);
logFailureForDlqObjects(List.of(createDlqObjectFromEvent(event, indexName, e.getMessage())), e);
versionExpressionDroppedEventsCounter.increment();
dynamicDocumentVersionDroppedEvents.increment();
} catch (final RuntimeException e) {
LOG.error("There was an exception when evaluating the version_expression {}. Check the dlq if configured to see details about the affected Event: {}", versionExpression, e.getMessage());
LOG.error("There was an exception when evaluating the document_version '{}'. Check the dlq if configured to see details about the affected Event: {}", versionExpression, e.getMessage());
logFailureForDlqObjects(List.of(createDlqObjectFromEvent(event, indexName, e.getMessage())), e);
versionExpressionDroppedEventsCounter.increment();
dynamicDocumentVersionDroppedEvents.increment();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -374,11 +374,13 @@ public void testExecuteWithMaxRetriesAndSuccesses() throws Exception {
final IndexOperation<SerializedJson> indexOperation2 = new IndexOperation.Builder<SerializedJson>().index(testIndex).id("2").document(arbitraryDocument()).build();
final IndexOperation<SerializedJson> indexOperation3 = new IndexOperation.Builder<SerializedJson>().index(testIndex).id("3").document(arbitraryDocument()).build();
final IndexOperation<SerializedJson> indexOperation4 = new IndexOperation.Builder<SerializedJson>().index(testIndex).id("4").document(arbitraryDocument()).build();
final IndexOperation<SerializedJson> indexOperation5 = new IndexOperation.Builder<SerializedJson>().index(testIndex).id("5").document(arbitraryDocument()).build();
final AccumulatingBulkRequest accumulatingBulkRequest = new JavaClientAccumulatingUncompressedBulkRequest(new BulkRequest.Builder());
accumulatingBulkRequest.addOperation(new BulkOperationWrapper(new BulkOperation.Builder().index(indexOperation1).build(), eventHandle1));
accumulatingBulkRequest.addOperation(new BulkOperationWrapper(new BulkOperation.Builder().index(indexOperation2).build(), eventHandle2));
accumulatingBulkRequest.addOperation(new BulkOperationWrapper(new BulkOperation.Builder().index(indexOperation3).build(), eventHandle3));
accumulatingBulkRequest.addOperation(new BulkOperationWrapper(new BulkOperation.Builder().index(indexOperation4).build(), eventHandle4));
accumulatingBulkRequest.addOperation(new BulkOperationWrapper(new BulkOperation.Builder().index(indexOperation5).build(), eventHandle5));
bulkRetryStrategy.execute(accumulatingBulkRequest);
MatcherAssert.assertThat(maxRetriesLimitReached, equalTo(true));
assertEquals(numEventsSucceeded, 2);
Expand Down Expand Up @@ -407,7 +409,6 @@ public void testExecuteNonRetryableResponse() throws Exception {
final IndexOperation<SerializedJson> indexOperation2 = new IndexOperation.Builder<SerializedJson>().index(testIndex).id("2").document(arbitraryDocument()).build();
final IndexOperation<SerializedJson> indexOperation3 = new IndexOperation.Builder<SerializedJson>().index(testIndex).id("3").document(arbitraryDocument()).build();
final IndexOperation<SerializedJson> indexOperation4 = new IndexOperation.Builder<SerializedJson>().index(testIndex).id("4").document(arbitraryDocument()).build();
final IndexOperation<SerializedJson> indexOperation5 = new IndexOperation.Builder<SerializedJson>().index(testIndex).id("5").document(arbitraryDocument()).build();
final AccumulatingBulkRequest accumulatingBulkRequest = new JavaClientAccumulatingUncompressedBulkRequest(new BulkRequest.Builder());
accumulatingBulkRequest.addOperation(new BulkOperationWrapper(new BulkOperation.Builder().index(indexOperation1).build(), eventHandle1));
accumulatingBulkRequest.addOperation(new BulkOperationWrapper(new BulkOperation.Builder().index(indexOperation2).build(), eventHandle2));
Expand Down Expand Up @@ -604,7 +605,7 @@ private BulkResponse bulkSecondResponse(final BulkRequest bulkRequest) {

private BulkResponse bulkSecondResponseWithFailures(final BulkRequest bulkRequest) {
final int requestSize = bulkRequest.operations().size();
assert requestSize == 3;
assert requestSize == 2;
final List<BulkResponseItem> bulkItemResponses = Arrays.asList(
internalServerErrorItemResponse(index), internalServerErrorItemResponse(index));
return new BulkResponse.Builder().items(bulkItemResponses).errors(true).took(10).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,16 +132,17 @@ public void testReadESConfigWithBulkActionCreate() {
@Test
public void testReadESConfigWithBulkActionCreateExpression() {

final String actionFormatExpression = "${getMetadata(\"action\")}";
final Map<String, Object> metadata = new HashMap<>();
metadata.put(IndexConfiguration.INDEX_TYPE, IndexType.TRACE_ANALYTICS_RAW.getValue());
metadata.put(IndexConfiguration.ACTION, "${getMetadata(\"action\")}");
metadata.put(IndexConfiguration.ACTION, actionFormatExpression);
metadata.put(ConnectionConfiguration.HOSTS, TEST_HOSTS);

final PluginSetting pluginSetting = new PluginSetting(PLUGIN_NAME, metadata);
pluginSetting.setPipelineName(PIPELINE_NAME);

expressionEvaluator = mock(ExpressionEvaluator.class);
when(expressionEvaluator.isValidExpressionStatement(anyString())).thenReturn(true);
when(expressionEvaluator.isValidFormatExpressions(actionFormatExpression)).thenReturn(true);
final OpenSearchSinkConfiguration openSearchSinkConfiguration =
OpenSearchSinkConfiguration.readESConfig(pluginSetting, expressionEvaluator);

Expand Down

0 comments on commit bcd150a

Please sign in to comment.