From 5f518fc6d19e2cfd18265c82bd63fd766c6d6d5e Mon Sep 17 00:00:00 2001 From: Taylor Gray Date: Mon, 29 Jan 2024 10:41:03 -0600 Subject: [PATCH] Create decrompress processor to decompress gzipped keys Signed-off-by: Taylor Gray --- .../decompress-processor/build.gradle | 16 ++ .../decompress/DecompressProcessor.java | 119 ++++++++++ .../decompress/DecompressProcessorConfig.java | 53 +++++ .../decompress/DecompressionType.java | 44 ++++ .../decompress/IDecompressionType.java | 12 + .../encoding/Base64DecoderEngine.java | 21 ++ .../decompress/encoding/DecoderEngine.java | 12 + .../decompress/encoding/EncodingType.java | 42 ++++ .../decompress/encoding/IEncodingType.java | 10 + .../exceptions/DecodingException.java | 12 + .../decompress/DecompressProcessorTest.java | 210 ++++++++++++++++++ .../decompress/ITDecompressProcessorTest.java | 72 ++++++ .../encoding/Base64DecoderEngineTest.java | 52 +++++ settings.gradle | 1 + 14 files changed, 676 insertions(+) create mode 100644 data-prepper-plugins/decompress-processor/build.gradle create mode 100644 data-prepper-plugins/decompress-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/decompress/DecompressProcessor.java create mode 100644 data-prepper-plugins/decompress-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/decompress/DecompressProcessorConfig.java create mode 100644 data-prepper-plugins/decompress-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/decompress/DecompressionType.java create mode 100644 data-prepper-plugins/decompress-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/decompress/IDecompressionType.java create mode 100644 data-prepper-plugins/decompress-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/decompress/encoding/Base64DecoderEngine.java create mode 100644 data-prepper-plugins/decompress-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/decompress/encoding/DecoderEngine.java create mode 100644 data-prepper-plugins/decompress-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/decompress/encoding/EncodingType.java create mode 100644 data-prepper-plugins/decompress-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/decompress/encoding/IEncodingType.java create mode 100644 data-prepper-plugins/decompress-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/decompress/exceptions/DecodingException.java create mode 100644 data-prepper-plugins/decompress-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/decompress/DecompressProcessorTest.java create mode 100644 data-prepper-plugins/decompress-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/decompress/ITDecompressProcessorTest.java create mode 100644 data-prepper-plugins/decompress-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/decompress/encoding/Base64DecoderEngineTest.java diff --git a/data-prepper-plugins/decompress-processor/build.gradle b/data-prepper-plugins/decompress-processor/build.gradle new file mode 100644 index 0000000000..4ef54f729c --- /dev/null +++ b/data-prepper-plugins/decompress-processor/build.gradle @@ -0,0 +1,16 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +plugins { + id 'java' +} + +dependencies { + implementation project(':data-prepper-api') + implementation project(':data-prepper-plugins:common') + implementation 'com.fasterxml.jackson.core:jackson-databind' + implementation 'io.micrometer:micrometer-core' + testImplementation testLibs.mockito.inline +} \ No newline at end of file diff --git a/data-prepper-plugins/decompress-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/decompress/DecompressProcessor.java b/data-prepper-plugins/decompress-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/decompress/DecompressProcessor.java new file mode 100644 index 0000000000..7c731123c9 --- /dev/null +++ b/data-prepper-plugins/decompress-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/decompress/DecompressProcessor.java @@ -0,0 +1,119 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.processor.decompress; + +import com.google.common.base.Charsets; +import io.micrometer.core.instrument.Counter; +import org.opensearch.dataprepper.expression.ExpressionEvaluator; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; +import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.processor.AbstractProcessor; +import org.opensearch.dataprepper.model.processor.Processor; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.plugins.processor.decompress.exceptions.DecodingException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.Collection; + +@DataPrepperPlugin(name = "decompress", pluginType = Processor.class, pluginConfigurationType = DecompressProcessorConfig.class) +public class DecompressProcessor extends AbstractProcessor, Record> { + + private static final Logger LOG = LoggerFactory.getLogger(DecompressProcessor.class); + static final String DECOMPRESSION_PROCESSING_ERRORS = "decompressionProcessingErrors"; + + private final DecompressProcessorConfig decompressProcessorConfig; + private final ExpressionEvaluator expressionEvaluator; + + private final Counter decompressionProcessingErrors; + + @DataPrepperPluginConstructor + public DecompressProcessor(final PluginMetrics pluginMetrics, + final DecompressProcessorConfig decompressProcessorConfig, + final ExpressionEvaluator expressionEvaluator) { + super(pluginMetrics); + this.decompressProcessorConfig = decompressProcessorConfig; + this.expressionEvaluator = expressionEvaluator; + this.decompressionProcessingErrors = pluginMetrics.counter(DECOMPRESSION_PROCESSING_ERRORS); + } + + @Override + public Collection> doExecute(final Collection> records) { + for (final Record record : records) { + + try { + if (decompressProcessorConfig.getDecompressWhen() != null && !expressionEvaluator.evaluateConditional(decompressProcessorConfig.getDecompressWhen(), record.getData())) { + continue; + } + + for (final String key : decompressProcessorConfig.getKeys()) { + + final String compressedValue = record.getData().get(key, String.class); + + if (compressedValue == null) { + continue; + } + + final byte[] compressedValueAsBytes = decompressProcessorConfig.getEncodingType().getDecoderEngine().decode(compressedValue); + + try (final InputStream inputStream = decompressProcessorConfig.getDecompressionType().getDecompressionEngine().createInputStream(new ByteArrayInputStream(compressedValueAsBytes)); + final BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream, Charsets.UTF_8)) + ){ + record.getData().put(key, getDecompressedString(bufferedReader)); + } catch (final Exception e) { + LOG.error("Unable to decompress key {} using decompression type {}:", + key, decompressProcessorConfig.getDecompressionType(), e); + record.getData().getMetadata().addTags(decompressProcessorConfig.getTagsOnFailure()); + decompressionProcessingErrors.increment(); + } + } + } catch (final DecodingException e) { + LOG.error("Unable to decode key with base64: {}", e.getMessage()); + record.getData().getMetadata().addTags(decompressProcessorConfig.getTagsOnFailure()); + decompressionProcessingErrors.increment(); + } catch (final Exception e) { + LOG.error("An uncaught exception occurred while decompressing Events", e); + record.getData().getMetadata().addTags(decompressProcessorConfig.getTagsOnFailure()); + decompressionProcessingErrors.increment(); + } + } + + return records; + } + + @Override + public void prepareForShutdown() { + + } + + @Override + public boolean isReadyForShutdown() { + return true; + } + + @Override + public void shutdown() { + + } + + private String getDecompressedString(final BufferedReader bufferedReader) throws IOException { + final StringBuilder stringBuilder = new StringBuilder(); + String line; + + while ((line = bufferedReader.readLine()) != null) { + stringBuilder.append(line); + } + + return stringBuilder.toString(); + } +} diff --git a/data-prepper-plugins/decompress-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/decompress/DecompressProcessorConfig.java b/data-prepper-plugins/decompress-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/decompress/DecompressProcessorConfig.java new file mode 100644 index 0000000000..81000f07b3 --- /dev/null +++ b/data-prepper-plugins/decompress-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/decompress/DecompressProcessorConfig.java @@ -0,0 +1,53 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.processor.decompress; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.constraints.NotEmpty; +import jakarta.validation.constraints.NotNull; +import org.opensearch.dataprepper.plugins.processor.decompress.encoding.EncodingType; +import org.opensearch.dataprepper.plugins.processor.decompress.encoding.IEncodingType; + +import java.util.List; + +public class DecompressProcessorConfig { + + @JsonProperty("keys") + @NotEmpty + private List keys; + + @JsonProperty("type") + @NotNull + private DecompressionType decompressionType; + + @JsonProperty("decompress_when") + private String decompressWhen; + + @JsonProperty("tags_on_failure") + private List tagsOnFailure = List.of("_decompression_failure"); + + @JsonIgnore + private final EncodingType encodingType = EncodingType.BASE64; + + public List getKeys() { + return keys; + } + + public IDecompressionType getDecompressionType() { + return decompressionType; + } + + public IEncodingType getEncodingType() { return encodingType; } + + public String getDecompressWhen() { + return decompressWhen; + } + + public List getTagsOnFailure() { + return tagsOnFailure; + } +} diff --git a/data-prepper-plugins/decompress-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/decompress/DecompressionType.java b/data-prepper-plugins/decompress-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/decompress/DecompressionType.java new file mode 100644 index 0000000000..ad127c7290 --- /dev/null +++ b/data-prepper-plugins/decompress-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/decompress/DecompressionType.java @@ -0,0 +1,44 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.processor.decompress; + +import com.fasterxml.jackson.annotation.JsonCreator; +import org.opensearch.dataprepper.model.codec.DecompressionEngine; +import org.opensearch.dataprepper.plugins.codec.GZipDecompressionEngine; + +import java.util.Arrays; +import java.util.Map; +import java.util.stream.Collectors; + +public enum DecompressionType implements IDecompressionType { + GZIP("gzip"); + + private final String option; + + private static final Map OPTIONS_MAP = Arrays.stream(DecompressionType.values()) + .collect(Collectors.toMap( + value -> value.option, + value -> value + )); + + private static final Map DECOMPRESSION_ENGINE_MAP = Map.of( + "gzip", new GZipDecompressionEngine() + ); + + DecompressionType(final String option) { + this.option = option; + } + + @JsonCreator + static DecompressionType fromOptionValue(final String option) { + return OPTIONS_MAP.get(option); + } + + @Override + public DecompressionEngine getDecompressionEngine() { + return DECOMPRESSION_ENGINE_MAP.get(this.option); + } +} diff --git a/data-prepper-plugins/decompress-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/decompress/IDecompressionType.java b/data-prepper-plugins/decompress-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/decompress/IDecompressionType.java new file mode 100644 index 0000000000..6853eb3228 --- /dev/null +++ b/data-prepper-plugins/decompress-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/decompress/IDecompressionType.java @@ -0,0 +1,12 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.processor.decompress; + +import org.opensearch.dataprepper.model.codec.DecompressionEngine; + +public interface IDecompressionType { + public DecompressionEngine getDecompressionEngine(); +} diff --git a/data-prepper-plugins/decompress-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/decompress/encoding/Base64DecoderEngine.java b/data-prepper-plugins/decompress-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/decompress/encoding/Base64DecoderEngine.java new file mode 100644 index 0000000000..a6d59d84ed --- /dev/null +++ b/data-prepper-plugins/decompress-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/decompress/encoding/Base64DecoderEngine.java @@ -0,0 +1,21 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.processor.decompress.encoding; + +import org.opensearch.dataprepper.plugins.processor.decompress.exceptions.DecodingException; + +import java.util.Base64; + +public class Base64DecoderEngine implements DecoderEngine { + @Override + public byte[] decode(final String encodedValue) { + try { + return Base64.getDecoder().decode(encodedValue); + } catch (final Exception e) { + throw new DecodingException(String.format("There was an error decoding with the base64 encoding type: %s", e.getMessage())); + } + } +} diff --git a/data-prepper-plugins/decompress-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/decompress/encoding/DecoderEngine.java b/data-prepper-plugins/decompress-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/decompress/encoding/DecoderEngine.java new file mode 100644 index 0000000000..ef443c273f --- /dev/null +++ b/data-prepper-plugins/decompress-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/decompress/encoding/DecoderEngine.java @@ -0,0 +1,12 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.processor.decompress.encoding; + +import org.opensearch.dataprepper.plugins.processor.decompress.exceptions.DecodingException; + +public interface DecoderEngine { + byte[] decode(final String encodedValue) throws DecodingException; +} diff --git a/data-prepper-plugins/decompress-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/decompress/encoding/EncodingType.java b/data-prepper-plugins/decompress-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/decompress/encoding/EncodingType.java new file mode 100644 index 0000000000..43b9f18fa7 --- /dev/null +++ b/data-prepper-plugins/decompress-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/decompress/encoding/EncodingType.java @@ -0,0 +1,42 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.processor.decompress.encoding; + +import com.fasterxml.jackson.annotation.JsonCreator; + +import java.util.Arrays; +import java.util.Map; +import java.util.stream.Collectors; + +public enum EncodingType implements IEncodingType { + BASE64("base64"); + + private final String option; + + private static final Map OPTIONS_MAP = Arrays.stream(EncodingType.values()) + .collect(Collectors.toMap( + value -> value.option, + value -> value + )); + + private static final Map DECODER_ENGINE_MAP = Map.of( + "base64", new Base64DecoderEngine() + ); + + EncodingType(final String option) { + this.option = option; + } + + @JsonCreator + static EncodingType fromOptionValue(final String option) { + return OPTIONS_MAP.get(option); + } + + @Override + public DecoderEngine getDecoderEngine() { + return DECODER_ENGINE_MAP.get(this.option); + } +} diff --git a/data-prepper-plugins/decompress-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/decompress/encoding/IEncodingType.java b/data-prepper-plugins/decompress-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/decompress/encoding/IEncodingType.java new file mode 100644 index 0000000000..3b513523c7 --- /dev/null +++ b/data-prepper-plugins/decompress-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/decompress/encoding/IEncodingType.java @@ -0,0 +1,10 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.processor.decompress.encoding; + +public interface IEncodingType { + DecoderEngine getDecoderEngine(); +} diff --git a/data-prepper-plugins/decompress-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/decompress/exceptions/DecodingException.java b/data-prepper-plugins/decompress-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/decompress/exceptions/DecodingException.java new file mode 100644 index 0000000000..f14e67d92c --- /dev/null +++ b/data-prepper-plugins/decompress-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/decompress/exceptions/DecodingException.java @@ -0,0 +1,12 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.processor.decompress.exceptions; + +public class DecodingException extends RuntimeException { + public DecodingException(final String message) { + super(message); + } +} diff --git a/data-prepper-plugins/decompress-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/decompress/DecompressProcessorTest.java b/data-prepper-plugins/decompress-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/decompress/DecompressProcessorTest.java new file mode 100644 index 0000000000..542058c2a2 --- /dev/null +++ b/data-prepper-plugins/decompress-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/decompress/DecompressProcessorTest.java @@ -0,0 +1,210 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.processor.decompress; + +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.Timer; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.expression.ExpressionEvaluator; +import org.opensearch.dataprepper.metrics.MetricNames; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.codec.DecompressionEngine; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.plugins.processor.decompress.encoding.DecoderEngine; +import org.opensearch.dataprepper.plugins.processor.decompress.encoding.IEncodingType; +import org.opensearch.dataprepper.plugins.processor.decompress.exceptions.DecodingException; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.when; +import static org.opensearch.dataprepper.plugins.processor.decompress.DecompressProcessor.DECOMPRESSION_PROCESSING_ERRORS; + +@ExtendWith(MockitoExtension.class) +public class DecompressProcessorTest { + + private String key; + + @Mock + private DecompressionEngine decompressionEngine; + + @Mock + private DecoderEngine decoderEngine; + + @Mock + private PluginMetrics pluginMetrics; + + @Mock + private Counter decompressionProcessingErrors; + + @Mock + private ExpressionEvaluator expressionEvaluator; + + @Mock + private DecompressProcessorConfig decompressProcessorConfig; + + @Mock + private IDecompressionType decompressionType; + + @Mock + private IEncodingType encodingType; + + private DecompressProcessor createObjectUnderTest() { + return new DecompressProcessor(pluginMetrics, decompressProcessorConfig, expressionEvaluator); + } + + @BeforeEach + void setup() { + key = UUID.randomUUID().toString(); + + when(pluginMetrics.counter(MetricNames.RECORDS_IN)).thenReturn(mock(Counter.class)); + when(pluginMetrics.counter(MetricNames.RECORDS_OUT)).thenReturn(mock(Counter.class)); + when(pluginMetrics.timer(MetricNames.TIME_ELAPSED)).thenReturn(mock(Timer.class)); + when(pluginMetrics.counter(DECOMPRESSION_PROCESSING_ERRORS)).thenReturn(decompressionProcessingErrors); + } + + @Test + void decompression_returns_expected_output() throws IOException { + final String compressedValue = UUID.randomUUID().toString(); + final String expectedResult = UUID.randomUUID().toString(); + final byte[] decodedValue = expectedResult.getBytes(); + + when(decompressProcessorConfig.getKeys()).thenReturn(List.of(key)); + when(encodingType.getDecoderEngine()).thenReturn(decoderEngine); + when(decompressProcessorConfig.getEncodingType()).thenReturn(encodingType); + when(decompressProcessorConfig.getDecompressionType()).thenReturn(decompressionType); + when(decompressionType.getDecompressionEngine()).thenReturn(decompressionEngine); + when(decoderEngine.decode(compressedValue)).thenReturn(decodedValue); + when(decompressionEngine.createInputStream(any(InputStream.class))).thenReturn(new ByteArrayInputStream(decodedValue)); + + final List> records = List.of(buildRecordWithEvent(Map.of(key, compressedValue))); + + final DecompressProcessor objectUnderTest = createObjectUnderTest(); + final List> result = (List>) objectUnderTest.doExecute(records); + + assertThat(result, notNullValue()); + assertThat(result.size(), equalTo(1)); + assertThat(result.get(0), notNullValue()); + assertThat(result.get(0).getData(), notNullValue()); + assertThat(result.get(0).getData().get(key, String.class), equalTo(expectedResult)); + } + + @Test + void decompression_with_decoding_error_adds_tags_and_increments_error_metric() { + final String compressedValue = UUID.randomUUID().toString(); + final String tagForFailure = UUID.randomUUID().toString(); + + when(decompressProcessorConfig.getKeys()).thenReturn(List.of(key)); + when(encodingType.getDecoderEngine()).thenReturn(decoderEngine); + when(decompressProcessorConfig.getEncodingType()).thenReturn(encodingType); + when(decompressProcessorConfig.getTagsOnFailure()).thenReturn(List.of(tagForFailure)); + when(decoderEngine.decode(compressedValue)).thenThrow(DecodingException.class); + + final List> records = List.of(buildRecordWithEvent(Map.of(key, compressedValue))); + + final DecompressProcessor objectUnderTest = createObjectUnderTest(); + final List> result = (List>) objectUnderTest.doExecute(records); + + assertThat(result, notNullValue()); + assertThat(result.size(), equalTo(1)); + assertThat(result.get(0), notNullValue()); + assertThat(result.get(0).getData(), notNullValue()); + assertThat(result.get(0).getData().get(key, String.class), equalTo(compressedValue)); + assertThat(result.get(0).getData().getMetadata().getTags(), notNullValue()); + assertThat(result.get(0).getData().getMetadata().getTags().size(), equalTo(1)); + assertThat(result.get(0).getData().getMetadata().getTags().contains(tagForFailure), equalTo(true)); + + verifyNoInteractions(decompressionEngine); + verify(decompressionProcessingErrors).increment(); + } + + @Test + void exception_from_DecompressionEngine_adds_tags_and_increments_error_metric() throws IOException { + final String compressedValue = UUID.randomUUID().toString(); + final String expectedResult = UUID.randomUUID().toString(); + final byte[] decodedValue = expectedResult.getBytes(); + final String tagForFailure = UUID.randomUUID().toString(); + + when(decompressProcessorConfig.getKeys()).thenReturn(List.of(key)); + when(encodingType.getDecoderEngine()).thenReturn(decoderEngine); + when(decompressProcessorConfig.getEncodingType()).thenReturn(encodingType); + when(decompressProcessorConfig.getTagsOnFailure()).thenReturn(List.of(tagForFailure)); + when(decompressProcessorConfig.getDecompressionType()).thenReturn(decompressionType); + when(decompressionType.getDecompressionEngine()).thenReturn(decompressionEngine); + when(decoderEngine.decode(compressedValue)).thenReturn(decodedValue); + when(decompressionEngine.createInputStream(any(InputStream.class))).thenThrow(RuntimeException.class); + + final List> records = List.of(buildRecordWithEvent(Map.of(key, compressedValue))); + + final DecompressProcessor objectUnderTest = createObjectUnderTest(); + final List> result = (List>) objectUnderTest.doExecute(records); + + assertThat(result, notNullValue()); + assertThat(result.size(), equalTo(1)); + assertThat(result.get(0), notNullValue()); + assertThat(result.get(0).getData(), notNullValue()); + assertThat(result.get(0).getData().get(key, String.class), equalTo(compressedValue)); + assertThat(result.get(0).getData().getMetadata().getTags(), notNullValue()); + assertThat(result.get(0).getData().getMetadata().getTags().size(), equalTo(1)); + assertThat(result.get(0).getData().getMetadata().getTags().contains(tagForFailure), equalTo(true)); + + verify(decompressionProcessingErrors).increment(); + } + + @Test + void exception_from_expression_evaluator_adds_tags_and_increments_error_metric() { + final String decompressWhen = UUID.randomUUID().toString(); + final String compressedValue = UUID.randomUUID().toString(); + final String tagForFailure = UUID.randomUUID().toString(); + + when(decompressProcessorConfig.getTagsOnFailure()).thenReturn(List.of(tagForFailure)); + when(decompressProcessorConfig.getDecompressWhen()).thenReturn(decompressWhen); + when(expressionEvaluator.evaluateConditional(eq(decompressWhen), any(Event.class))) + .thenThrow(RuntimeException.class); + + final List> records = List.of(buildRecordWithEvent(Map.of(key, compressedValue))); + + final DecompressProcessor objectUnderTest = createObjectUnderTest(); + final List> result = (List>) objectUnderTest.doExecute(records); + + assertThat(result, notNullValue()); + assertThat(result.size(), equalTo(1)); + assertThat(result.get(0), notNullValue()); + assertThat(result.get(0).getData(), notNullValue()); + assertThat(result.get(0).getData().get(key, String.class), equalTo(compressedValue)); + assertThat(result.get(0).getData().getMetadata().getTags(), notNullValue()); + assertThat(result.get(0).getData().getMetadata().getTags().size(), equalTo(1)); + assertThat(result.get(0).getData().getMetadata().getTags().contains(tagForFailure), equalTo(true)); + + verifyNoInteractions(decoderEngine, decompressionEngine); + verify(decompressionProcessingErrors).increment(); + } + + static Record buildRecordWithEvent(final Map data) { + return new Record<>(JacksonEvent.builder() + .withData(data) + .withEventType("event") + .build()); + } +} diff --git a/data-prepper-plugins/decompress-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/decompress/ITDecompressProcessorTest.java b/data-prepper-plugins/decompress-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/decompress/ITDecompressProcessorTest.java new file mode 100644 index 0000000000..4950893af8 --- /dev/null +++ b/data-prepper-plugins/decompress-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/decompress/ITDecompressProcessorTest.java @@ -0,0 +1,72 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.processor.decompress; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.expression.ExpressionEvaluator; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.plugins.processor.decompress.encoding.EncodingType; + +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; +import static org.mockito.Mockito.when; +import static org.opensearch.dataprepper.plugins.processor.decompress.DecompressProcessorTest.buildRecordWithEvent; + +@ExtendWith(MockitoExtension.class) +public class ITDecompressProcessorTest { + + private List keys; + + @Mock + private PluginMetrics pluginMetrics; + + @Mock + private ExpressionEvaluator expressionEvaluator; + + @Mock + private DecompressProcessorConfig decompressProcessorConfig; + + private DecompressProcessor createObjectUnderTest() { + return new DecompressProcessor(pluginMetrics, decompressProcessorConfig, expressionEvaluator); + } + + @BeforeEach + void setup() { + keys = List.of(UUID.randomUUID().toString()); + when(decompressProcessorConfig.getKeys()).thenReturn(keys); + } + + @ParameterizedTest + @CsvSource({"H4sIAAAAAAAAAPNIzcnJVyjPL8pJAQBSntaLCwAAAA==,Hello world", + "H4sIAAAAAAAAAwvJyCxWAKJEhYKcxMy8ktSKEoXikqLMvHQAkJ3GfRoAAAA=,This is a plaintext string"}) + void base64_encoded_gzip_is_decompressed_successfully(final String compressedValue, final String expectedDecompressedValue) { + when(decompressProcessorConfig.getEncodingType()).thenReturn(EncodingType.BASE64); + when(decompressProcessorConfig.getDecompressionType()).thenReturn(DecompressionType.GZIP); + + final DecompressProcessor objectUnderTest = createObjectUnderTest(); + final List> records = List.of(buildRecordWithEvent(Map.of(keys.get(0), compressedValue))); + + final List> result = (List>) objectUnderTest.doExecute(records); + + assertThat(result, notNullValue()); + assertThat(result.size(), equalTo(1)); + assertThat(result.get(0), notNullValue()); + assertThat(result.get(0).getData(), notNullValue()); + assertThat(result.get(0).getData().get(keys.get(0), String.class), equalTo(expectedDecompressedValue)); + } +} diff --git a/data-prepper-plugins/decompress-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/decompress/encoding/Base64DecoderEngineTest.java b/data-prepper-plugins/decompress-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/decompress/encoding/Base64DecoderEngineTest.java new file mode 100644 index 0000000000..989a4a067a --- /dev/null +++ b/data-prepper-plugins/decompress-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/decompress/encoding/Base64DecoderEngineTest.java @@ -0,0 +1,52 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.processor.decompress.encoding; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; +import org.mockito.MockedStatic; +import org.opensearch.dataprepper.plugins.processor.decompress.exceptions.DecodingException; + +import java.util.Base64; +import java.util.UUID; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.when; + +public class Base64DecoderEngineTest { + + @ParameterizedTest + @CsvSource(value = {"Hello world,SGVsbG8gd29ybGQ=", "Test123,VGVzdDEyMw=="}) + void decode_correctly_decodes_base64(final String expectedDecodedValue, final String base64EncodedValue) { + final byte[] expectedDecodedBytes = expectedDecodedValue.getBytes(); + + final DecoderEngine objectUnderTest = new Base64DecoderEngine(); + + final byte[] decodedBytes = objectUnderTest.decode(base64EncodedValue); + + assertThat(decodedBytes, equalTo(expectedDecodedBytes)); + } + + @Test + void decode_throws_DecodingException_when_decoding_base64_throws_exception() { + final String encodedValue = UUID.randomUUID().toString(); + final Base64.Decoder decoder = mock(Base64.Decoder.class); + when(decoder.decode(encodedValue)).thenThrow(RuntimeException.class); + + try(final MockedStatic base64MockedStatic = mockStatic(Base64.class)) { + base64MockedStatic.when(Base64::getDecoder).thenReturn(decoder); + + final DecoderEngine objectUnderTest = new Base64DecoderEngine(); + + assertThrows(DecodingException.class, () -> objectUnderTest.decode(encodedValue)); + } + } +} diff --git a/settings.gradle b/settings.gradle index c5d0d6c916..1aa72e7e12 100644 --- a/settings.gradle +++ b/settings.gradle @@ -150,3 +150,4 @@ include 'data-prepper-plugins:buffer-common' //include 'data-prepper-plugins:prometheus-sink' include 'data-prepper-plugins:dissect-processor' include 'data-prepper-plugins:dynamodb-source' +include 'data-prepper-plugins:decompress-processor'