Skip to content

Commit

Permalink
Merge branch 'opensearch-project:main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
kkondaka committed Aug 20, 2024
2 parents 5508672 + 385d438 commit aca587f
Show file tree
Hide file tree
Showing 59 changed files with 975 additions and 158 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,19 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.processor.drop;
package org.opensearch.dataprepper.model.event;

import org.opensearch.dataprepper.model.event.Event;
import com.fasterxml.jackson.annotation.JsonCreator;
import org.slf4j.Logger;

import java.util.Arrays;
import java.util.Map;
import java.util.stream.Collectors;

import static org.opensearch.dataprepper.logging.DataPrepperMarkers.EVENT;

enum HandleFailedEventsOption {
DROP("drop", true, false),
DROP_SILENTLY("drop_silently", true, true),
SKIP("skip", false, false),
SKIP_SILENTLY("skip_silently", false, true);
public enum HandleFailedEventsOption {
DROP("drop", true, true),
DROP_SILENTLY("drop_silently", true, false),
SKIP("skip", false, true),
SKIP_SILENTLY("skip_silently", false, false);

private static final Map<String, HandleFailedEventsOption> OPTIONS_MAP = Arrays.stream(HandleFailedEventsOption.values())
.collect(Collectors.toMap(
Expand All @@ -37,13 +33,14 @@ enum HandleFailedEventsOption {
this.isLogRequired = isLogRequired;
}

public boolean isDropEventOption(final Event event, final Throwable cause, final Logger log) {
if (isLogRequired) {
log.warn(EVENT, "An exception occurred while processing when expression for event {}", event, cause);
}
public boolean shouldDropEvent() {
return isDropEventOption;
}

public boolean shouldLog() {
return isLogRequired;
}

@JsonCreator
static HandleFailedEventsOption fromOptionValue(final String option) {
return OPTIONS_MAP.get(option.toLowerCase());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.model.event;

import org.hamcrest.CoreMatchers;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;

class HandleFailedEventsOptionTest {
@ParameterizedTest
@EnumSource(HandleFailedEventsOption.class)
void fromOptionValue(final HandleFailedEventsOption option) {
assertThat(HandleFailedEventsOption.fromOptionValue(option.name()), CoreMatchers.is(option));

if (option == HandleFailedEventsOption.SKIP || option == HandleFailedEventsOption.SKIP_SILENTLY) {
assertThat(option.shouldDropEvent(), equalTo(false));
} else {
assertThat(option.shouldDropEvent(), equalTo(true));
}

if (option == HandleFailedEventsOption.SKIP_SILENTLY || option == HandleFailedEventsOption.DROP_SILENTLY) {
assertThat(option.shouldLog(), equalTo(false));
} else {
assertThat(option.shouldLog(), equalTo(true));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.NotEmpty;
import org.opensearch.dataprepper.model.event.HandleFailedEventsOption;

public class DropEventProcessorConfig {
@JsonProperty("drop_when")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,14 @@

import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.model.event.HandleFailedEventsOption;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Objects;

import static org.opensearch.dataprepper.logging.DataPrepperMarkers.EVENT;

/**
* @since 1.3
*
Expand Down Expand Up @@ -57,7 +60,10 @@ public boolean isStatementFalseWith(final Event event) {
try {
return !expressionEvaluator.evaluateConditional(dropWhen, event);
} catch (final Exception e) {
return handleFailedEventsSetting.isDropEventOption(event, e, LOG);
if (handleFailedEventsSetting.shouldLog()) {
LOG.warn(EVENT, "An exception occurred while processing when expression for event [{}]", event, e);
}
return handleFailedEventsSetting.shouldDropEvent();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.opensearch.dataprepper.model.event.HandleFailedEventsOption;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.nullValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.HandleFailedEventsOption;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.record.Record;
import org.junit.jupiter.api.BeforeEach;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.model.event.HandleFailedEventsOption;

import java.util.UUID;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.model.event.HandleFailedEventsOption;

import java.util.UUID;
import java.util.stream.Stream;
Expand Down

This file was deleted.

1 change: 1 addition & 0 deletions data-prepper-plugins/parse-json-processor/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ plugins {
dependencies {
implementation project(':data-prepper-api')
implementation project(':data-prepper-plugins:common')
implementation 'io.micrometer:micrometer-core'
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-ion'
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-xml'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,13 @@
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventKey;
import org.opensearch.dataprepper.model.event.EventKeyFactory;
import org.opensearch.dataprepper.model.event.HandleFailedEventsOption;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.processor.AbstractProcessor;
import org.opensearch.dataprepper.model.record.Record;
import io.micrometer.core.instrument.Counter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -29,30 +33,40 @@

public abstract class AbstractParseProcessor extends AbstractProcessor<Record<Event>, Record<Event>> {
private static final Logger LOG = LoggerFactory.getLogger(AbstractParseProcessor.class);
private static final String PROCESSING_FAILURES = "processingFailures";

private final String source;
private final String destination;
private final EventKey source;
private final EventKey destination;
private final String pointer;
private final String parseWhen;
private final List<String> tagsOnFailure;
private final boolean overwriteIfDestinationExists;
private final boolean deleteSourceRequested;

private final HandleFailedEventsOption handleFailedEventsOption;

protected final Counter processingFailuresCounter;

private final ExpressionEvaluator expressionEvaluator;
private final EventKeyFactory eventKeyFactory;

protected AbstractParseProcessor(PluginMetrics pluginMetrics,
CommonParseConfig commonParseConfig,
ExpressionEvaluator expressionEvaluator) {
protected AbstractParseProcessor(final PluginMetrics pluginMetrics,
final CommonParseConfig commonParseConfig,
final ExpressionEvaluator expressionEvaluator,
final EventKeyFactory eventKeyFactory) {
super(pluginMetrics);

source = commonParseConfig.getSource();
destination = commonParseConfig.getDestination();
source = eventKeyFactory.createEventKey(commonParseConfig.getSource(), EventKeyFactory.EventAction.GET, EventKeyFactory.EventAction.DELETE);
destination = commonParseConfig.getDestination() != null ? eventKeyFactory.createEventKey(commonParseConfig.getDestination(), EventKeyFactory.EventAction.PUT, EventKeyFactory.EventAction.GET) : null;
pointer = commonParseConfig.getPointer();
parseWhen = commonParseConfig.getParseWhen();
tagsOnFailure = commonParseConfig.getTagsOnFailure();
overwriteIfDestinationExists = commonParseConfig.getOverwriteIfDestinationExists();
deleteSourceRequested = commonParseConfig.isDeleteSourceRequested();
handleFailedEventsOption = commonParseConfig.getHandleFailedEventsOption();
processingFailuresCounter = pluginMetrics.counter(PROCESSING_FAILURES);
this.expressionEvaluator = expressionEvaluator;
this.eventKeyFactory = eventKeyFactory;
}

/**
Expand Down Expand Up @@ -100,7 +114,10 @@ public Collection<Record<Event>> doExecute(final Collection<Record<Event>> recor
event.delete(this.source);
}
} catch (Exception e) {
LOG.error(EVENT, "An exception occurred while using the {} processor on Event [{}]", getProcessorName(), record.getData(), e);
processingFailuresCounter.increment();
if (handleFailedEventsOption.shouldLog()) {
LOG.error(EVENT, "An exception occurred while using the {} processor on Event [{}]", getProcessorName(), record.getData(), e);
}
}
}
return records;
Expand Down Expand Up @@ -128,7 +145,8 @@ private String getProcessorName() {
private Map<String, Object> parseUsingPointer(final Event event, final Map<String, Object> parsedJson, final String pointer,
final boolean doWriteToRoot) {
final Event temporaryEvent = JacksonEvent.builder().withEventType("event").build();
temporaryEvent.put(source, parsedJson);
final EventKey temporaryPutKey = eventKeyFactory.createEventKey(source.getKey(), EventKeyFactory.EventAction.PUT);
temporaryEvent.put(temporaryPutKey, parsedJson);

final String trimmedPointer = trimPointer(pointer);
final String actualPointer = source + "/" + trimmedPointer;
Expand Down Expand Up @@ -170,15 +188,15 @@ private String normalizePointerStructure(final String pointer) {
return pointer.replace('/','.');
}

private String trimPointer(String pointer) {
private String trimPointer(final String pointer) {
final String trimmedLeadingSlash = pointer.startsWith("/") ? pointer.substring(1) : pointer;
return trimmedLeadingSlash.endsWith("/") ? trimmedLeadingSlash.substring(0, trimmedLeadingSlash.length() - 1) : trimmedLeadingSlash;
}

private void writeToRoot(final Event event, final Map<String, Object> parsedJson) {
for (Map.Entry<String, Object> entry : parsedJson.entrySet()) {
for (final Map.Entry<String, Object> entry : parsedJson.entrySet()) {
if (overwriteIfDestinationExists || !event.containsKey(entry.getKey())) {
event.put(entry.getKey(), entry.getValue());
event.put(eventKeyFactory.createEventKey(entry.getKey(), EventKeyFactory.EventAction.PUT), entry.getValue());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.dataprepper.plugins.processor.parse;

import java.util.List;
import org.opensearch.dataprepper.model.event.HandleFailedEventsOption;

public interface CommonParseConfig {
/**
Expand Down Expand Up @@ -59,4 +60,10 @@ public interface CommonParseConfig {
* Defaults to false.
*/
boolean isDeleteSourceRequested();

/**
* An optional setting used to determine how to handle parsing errors. Default is skip, which includes logging the error
* and passing the failed Event downstream to the next processor.
*/
HandleFailedEventsOption getHandleFailedEventsOption();
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,14 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.dataformat.ion.IonObjectMapper;
import io.micrometer.core.instrument.Counter;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventKeyFactory;
import org.opensearch.dataprepper.model.event.HandleFailedEventsOption;
import org.opensearch.dataprepper.model.processor.Processor;
import org.opensearch.dataprepper.plugins.processor.parse.AbstractParseProcessor;
import org.slf4j.Logger;
Expand All @@ -26,17 +29,26 @@
@DataPrepperPlugin(name = "parse_ion", pluginType = Processor.class, pluginConfigurationType = ParseIonProcessorConfig.class)
public class ParseIonProcessor extends AbstractParseProcessor {
private static final Logger LOG = LoggerFactory.getLogger(ParseIonProcessor.class);
private static final String PARSE_ERRORS = "parseErrors";

private final IonObjectMapper objectMapper = new IonObjectMapper();

private final Counter parseErrorsCounter;

private final HandleFailedEventsOption handleFailedEventsOption;

@DataPrepperPluginConstructor
public ParseIonProcessor(final PluginMetrics pluginMetrics,
final ParseIonProcessorConfig parseIonProcessorConfig,
final ExpressionEvaluator expressionEvaluator) {
super(pluginMetrics, parseIonProcessorConfig, expressionEvaluator);
final ExpressionEvaluator expressionEvaluator,
final EventKeyFactory eventKeyFactory) {
super(pluginMetrics, parseIonProcessorConfig, expressionEvaluator, eventKeyFactory);

// Convert Timestamps to ISO-8601 Z strings
objectMapper.registerModule(new IonTimestampConverterModule());

handleFailedEventsOption = parseIonProcessorConfig.getHandleFailedEventsOption();
parseErrorsCounter = pluginMetrics.counter(PARSE_ERRORS);
}

@Override
Expand All @@ -45,10 +57,16 @@ protected Optional<HashMap<String, Object>> readValue(String message, Event cont
// We need to do a two-step process here, read the value in, then convert away any Ion types like Timestamp
return Optional.of(objectMapper.convertValue(objectMapper.readValue(message, new TypeReference<>() {}), new TypeReference<>() {}));
} catch (JsonProcessingException e) {
LOG.error(SENSITIVE, "An exception occurred due to invalid Ion while parsing [{}] due to {}", message, e.getMessage());
if (handleFailedEventsOption.shouldLog()) {
LOG.error(SENSITIVE, "An exception occurred due to invalid Ion while parsing [{}] due to {}", message, e.getMessage());
}
parseErrorsCounter.increment();
return Optional.empty();
} catch (Exception e) {
LOG.error(SENSITIVE, "An exception occurred while using the parse_ion processor while parsing [{}]", message, e);
if (handleFailedEventsOption.shouldLog()) {
LOG.error(SENSITIVE, "An exception occurred while using the parse_ion processor while parsing [{}]", message, e);
}
processingFailuresCounter.increment();
return Optional.empty();
}
}
Expand Down
Loading

0 comments on commit aca587f

Please sign in to comment.