Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
Signed-off-by: Srikanth Govindarajan <srigovs@amazon.com>
  • Loading branch information
srikanthjg committed Sep 13, 2024
1 parent e0adff4 commit a444078
Show file tree
Hide file tree
Showing 18 changed files with 159 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ public final class DataPrepperMarkers {

static {
EVENT.add(SENSITIVE);
NOISY.add(SENSITIVE);
}

private DataPrepperMarkers() {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public Collection<Record<?>> execute(final Collection<Record<?>> records) {
try {
Thread.sleep(delayDuration.toMillis());
} catch (final InterruptedException ex) {
LOG.error(NOISY,"Interrupted during delay processor", ex);
LOG.error(NOISY, "Interrupted during delay processor", ex);
}
return records;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import static org.opensearch.dataprepper.logging.DataPrepperMarkers.EVENT;
import static org.opensearch.dataprepper.logging.DataPrepperMarkers.NOISY;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
Expand Down Expand Up @@ -83,7 +84,13 @@ public Collection<Record<Event>> execute(final Collection<Record<Event>> records
.build();
modifiedRecords.add(new Record<>(newRecordEvent));
} catch (JsonProcessingException e) {
LOG.error(NOISY, "Unable to process Event data: {}", eventJson, e);
LOG.atError()
.addMarker(EVENT)
.addMarker(NOISY)
.setMessage("Unable to process Event data: {}")
.addArgument(eventJson)
.setCause(e)
.log();
}
}
return modifiedRecords;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.fasterxml.jackson.dataformat.csv.CsvSchema;
import io.micrometer.core.instrument.Counter;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import static org.opensearch.dataprepper.logging.DataPrepperMarkers.EVENT;
import static org.opensearch.dataprepper.logging.DataPrepperMarkers.NOISY;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
Expand Down Expand Up @@ -104,7 +105,13 @@ public Collection<Record<Event>> doExecute(final Collection<Record<Event>> recor
}
} catch (final IOException e) {
csvInvalidEventsCounter.increment();
LOG.error(NOISY, "An exception occurred while reading event [{}]", event, e);
LOG.atError()
.addMarker(EVENT)
.addMarker(NOISY)
.setMessage("An exception occurred while reading event [{}]")
.addArgument(event)
.setCause(e)
.log();
}
}
return records;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
package org.opensearch.dataprepper.plugins.processor.dissect;

import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import static org.opensearch.dataprepper.logging.DataPrepperMarkers.EVENT;
import static org.opensearch.dataprepper.logging.DataPrepperMarkers.NOISY;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
Expand All @@ -26,8 +28,6 @@
import java.util.Map;
import java.util.Objects;

import static org.opensearch.dataprepper.logging.DataPrepperMarkers.EVENT;


@DataPrepperPlugin(name = "dissect", pluginType = Processor.class, pluginConfigurationType = DissectProcessorConfig.class)
public class DissectProcessor extends AbstractProcessor<Record<Event>, Record<Event>> {
Expand Down Expand Up @@ -72,7 +72,13 @@ public Collection<Record<Event>> doExecute(Collection<Record<Event>> records) {
}
}
} catch (Exception ex){
LOG.error(NOISY, "Error dissecting the event [{}] ", record.getData(), ex);
LOG.atError()
.addMarker(EVENT)
.addMarker(NOISY)
.setMessage("Error dissecting the event [{}]")
.addArgument(record.getData())
.setCause(ex)
.log();
}
}
return records;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Timer;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import static org.opensearch.dataprepper.logging.DataPrepperMarkers.EVENT;
import static org.opensearch.dataprepper.logging.DataPrepperMarkers.NOISY;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
Expand Down Expand Up @@ -152,11 +153,25 @@ public Collection<Record<Event>> doExecute(final Collection<Record<Event>> recor

} catch (final TimeoutException e) {
event.getMetadata().addTags(tagsOnTimeout);
LOG.error(NOISY, "Matching on record [{}] took longer than [{}] and timed out", record.getData(), grokProcessorConfig.getTimeoutMillis());
LOG.atError()
.addMarker(EVENT)
.addMarker(NOISY)
.setMessage("Matching on record [{}] took longer than [{}] and timed out")
.addArgument(record.getData())
.addArgument(grokProcessorConfig.getTimeoutMillis())
.log();

grokProcessingTimeoutsCounter.increment();
} catch (final ExecutionException | InterruptedException | RuntimeException e) {
event.getMetadata().addTags(tagsOnMatchFailure);
LOG.error(NOISY, "An exception occurred when matching record [{}]", record.getData(), e);
LOG.atError()
.addMarker(EVENT)
.addMarker(NOISY)
.setMessage("An exception occurred when matching record [{}]")
.addArgument(record.getData())
.setCause(e)
.log();

grokProcessingErrorsCounter.increment();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.dataprepper.plugins.processor.mutateevent;

import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import static org.opensearch.dataprepper.logging.DataPrepperMarkers.EVENT;
import static org.opensearch.dataprepper.logging.DataPrepperMarkers.NOISY;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
Expand Down Expand Up @@ -91,12 +92,28 @@ public Collection<Record<Event>> doExecute(final Collection<Record<Event>> recor
}
}
} catch (Exception e) {
LOG.error(NOISY, "Error adding entry to record [{}] with key [{}], metadataKey [{}], value_expression [{}] format [{}], value [{}]",
recordEvent, entry.getKey(), entry.getMetadataKey(), entry.getValueExpression(), entry.getFormat(), entry.getValue(), e);
LOG.atError()
.addMarker(EVENT)
.addMarker(NOISY)
.setMessage("Error adding entry to record [{}] with key [{}], metadataKey [{}], value_expression [{}] format [{}], value [{}]")
.addArgument(recordEvent)
.addArgument(entry.getKey())
.addArgument(entry.getMetadataKey())
.addArgument(entry.getValueExpression())
.addArgument(entry.getFormat())
.addArgument(entry.getValue())
.setCause(e)
.log();
}
}
} catch(final Exception e){
LOG.error(NOISY, "There was an exception while processing Event [{}]", recordEvent, e);
LOG.atError()
.addMarker(EVENT)
.addMarker(NOISY)
.setMessage("There was an exception while processing Event [{}]")
.addArgument(recordEvent)
.setCause(e)
.log();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,13 @@ public Collection<Record<Event>> doExecute(final Collection<Record<Event>> recor
}
}
} catch (final Exception e) {
LOG.error(NOISY, "There was an exception while processing Event [{}]", recordEvent, e);
LOG.atError()
.addMarker(EVENT)
.addMarker(NOISY)
.setMessage("There was an exception while processing Event [{}]")
.addArgument(recordEvent)
.setCause(e)
.log();
recordEvent.getMetadata().addTags(tagsOnFailure);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.dataprepper.plugins.processor.mutateevent;

import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import static org.opensearch.dataprepper.logging.DataPrepperMarkers.EVENT;
import static org.opensearch.dataprepper.logging.DataPrepperMarkers.NOISY;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
Expand Down Expand Up @@ -92,7 +93,13 @@ public Collection<Record<Event>> doExecute(final Collection<Record<Event>> recor
}
}
} catch (final Exception e) {
LOG.error(NOISY, "There was an exception while processing Event [{}]", recordEvent, e);
LOG.atError()
.addMarker(EVENT)
.addMarker(NOISY)
.setMessage("There was an exception while processing Event [{}]")
.addArgument(recordEvent)
.setCause(e)
.log();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.dataprepper.plugins.processor.mutateevent;

import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import static org.opensearch.dataprepper.logging.DataPrepperMarkers.EVENT;
import static org.opensearch.dataprepper.logging.DataPrepperMarkers.NOISY;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
Expand Down Expand Up @@ -61,7 +62,13 @@ public Collection<Record<Event>> doExecute(final Collection<Record<Event>> recor
recordEvent.delete(entry);
}
} catch (final Exception e) {
LOG.error(NOISY, "There was an exception while processing Event [{}]", recordEvent, e);
LOG.atError()
.addMarker(EVENT)
.addMarker(NOISY)
.setMessage("There was an exception while processing Event [{}]")
.addArgument(recordEvent)
.setCause(e)
.log();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,19 +78,37 @@ public Collection<Record<Event>> doExecute(final Collection<Record<Event>> recor
recordEvent.getMetadata().addTags(config.getTagsOnFailure());
continue;
} catch (final Exception e) {
LOG.error(NOISY, "Error converting source list to map on record [{}]", recordEvent, e);
LOG.atError()
.addMarker(EVENT)
.addMarker(NOISY)
.setMessage("Error converting source list to map on record [{}]")
.addArgument(recordEvent)
.setCause(e)
.log();
recordEvent.getMetadata().addTags(config.getTagsOnFailure());
continue;
}

try {
updateEvent(recordEvent, targetMap);
} catch (final Exception e) {
LOG.error(NOISY, "Error updating record [{}] after converting source list to map", recordEvent, e);
LOG.atError()
.addMarker(EVENT)
.addMarker(NOISY)
.setMessage("Error updating record [{}] after converting source list to map")
.addArgument(recordEvent)
.setCause(e)
.log();
recordEvent.getMetadata().addTags(config.getTagsOnFailure());
}
} catch (final Exception e) {
LOG.error(NOISY, "There was an exception while processing Event [{}]", recordEvent, e);
LOG.atError()
.addMarker(EVENT)
.addMarker(NOISY)
.setMessage("There was an exception while processing Event [{}]")
.addArgument(recordEvent)
.setCause(e)
.log();
recordEvent.getMetadata().addTags(config.getTagsOnFailure());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import static org.opensearch.dataprepper.logging.DataPrepperMarkers.EVENT;
import static org.opensearch.dataprepper.logging.DataPrepperMarkers.NOISY;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
Expand Down Expand Up @@ -91,11 +92,22 @@ public Collection<Record<Event>> doExecute(final Collection<Record<Event>> recor
recordEvent.put(config.getTarget(), targetList);
}
} catch (Exception e) {
LOG.error(NOISY,"Fail to perform Map to List operation", e);
LOG.atError()
.addMarker(EVENT)
.addMarker(NOISY)
.setMessage("Fail to perform Map to List operation")
.setCause(e)
.log();
recordEvent.getMetadata().addTags(config.getTagsOnFailure());
}
} catch (final Exception e) {
LOG.error(NOISY, "There was an exception while processing Event [{}]", recordEvent, e);
LOG.atError()
.addMarker(EVENT)
.addMarker(NOISY)
.setMessage("There was an exception while processing Event [{}]")
.addArgument(recordEvent)
.setCause(e)
.log();
recordEvent.getMetadata().addTags(config.getTagsOnFailure());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.dataprepper.plugins.processor.mutateevent;

import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import static org.opensearch.dataprepper.logging.DataPrepperMarkers.EVENT;
import static org.opensearch.dataprepper.logging.DataPrepperMarkers.NOISY;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
Expand Down Expand Up @@ -70,6 +71,13 @@ public Collection<Record<Event>> doExecute(final Collection<Record<Event>> recor
}
} catch (final Exception e) {
LOG.error(NOISY, "There was an exception while processing Event [{}]", recordEvent, e);
LOG.atError()
.addMarker(EVENT)
.addMarker(NOISY)
.setMessage("There was an exception while processing Event [{}]")
.addArgument(recordEvent)
.setCause(e)
.log();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ public Collection<Record<Event>> doExecute(final Collection<Record<Event>> recor
performStringAction(recordEvent);
} catch (final Exception e) {
LOG.error(NOISY, "There was an exception while processing Event [{}]", recordEvent, e);
LOG.atError()
.addMarker(EVENT)
.addMarker(NOISY)
.setMessage("There was an exception while processing Event [{}]")
.addArgument(recordEvent)
.setCause(e)
.log();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,11 @@ public ObfuscationProcessor(final PluginMetrics pluginMetrics,
patterns.add(p);
} catch (Exception e) {
LOG.error(NOISY,e.getMessage());
LOG.atError()
.addMarker(EVENT)
.addMarker(NOISY)
.setMessage(e.getMessage())
.log();
throw new InvalidPluginConfigurationException("Invalid Pattern: \"" + rawPattern + "\" for source field " + this.source);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.apache.commons.lang3.Range;
import org.apache.commons.lang3.math.NumberUtils;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import static org.opensearch.dataprepper.logging.DataPrepperMarkers.EVENT;
import static org.opensearch.dataprepper.logging.DataPrepperMarkers.NOISY;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
Expand Down Expand Up @@ -66,8 +67,14 @@ public Collection<Record<Event>> doExecute(Collection<Record<Event>> records) {
translateSource(sourceObject, recordEvent, targetConfig);
}
} catch (Exception ex) {
LOG.error(NOISY, "Error mapping the source [{}] of entry [{}]", mappingConfig.getSource(),
record.getData(), ex);
LOG.atError()
.addMarker(EVENT)
.addMarker(NOISY)
.setMessage("Error mapping the source [{}] of entry [{}]")
.addArgument(mappingConfig.getSource())
.addArgument(record.getData())
.setCause(ex)
.log();
}
}
}
Expand Down
Loading

0 comments on commit a444078

Please sign in to comment.