Skip to content

Commit

Permalink
Address PR comments
Browse files Browse the repository at this point in the history
Signed-off-by: Taylor Gray <tylgry@amazon.com>
  • Loading branch information
graytaylor0 committed Feb 14, 2024
1 parent 5f518fc commit 3e06739
Show file tree
Hide file tree
Showing 11 changed files with 145 additions and 34 deletions.
5 changes: 1 addition & 4 deletions data-prepper-plugins/decompress-processor/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,8 @@
* SPDX-License-Identifier: Apache-2.0
*/

plugins {
id 'java'
}

dependencies {
implementation 'commons-io:commons-io:2.15.1'
implementation project(':data-prepper-api')
implementation project(':data-prepper-plugins:common')
implementation 'com.fasterxml.jackson.core:jackson-databind'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,30 +7,29 @@

import com.google.common.base.Charsets;
import io.micrometer.core.instrument.Counter;
import org.apache.commons.io.IOUtils;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;
import org.opensearch.dataprepper.model.processor.AbstractProcessor;
import org.opensearch.dataprepper.model.processor.Processor;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.processor.decompress.exceptions.DecodingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.Collection;

@DataPrepperPlugin(name = "decompress", pluginType = Processor.class, pluginConfigurationType = DecompressProcessorConfig.class)
public class DecompressProcessor extends AbstractProcessor<Record<Event>, Record<Event>> {

private static final Logger LOG = LoggerFactory.getLogger(DecompressProcessor.class);
static final String DECOMPRESSION_PROCESSING_ERRORS = "decompressionProcessingErrors";
static final String DECOMPRESSION_PROCESSING_ERRORS = "processingErrors";

private final DecompressProcessorConfig decompressProcessorConfig;
private final ExpressionEvaluator expressionEvaluator;
Expand All @@ -45,6 +44,13 @@ public DecompressProcessor(final PluginMetrics pluginMetrics,
this.decompressProcessorConfig = decompressProcessorConfig;
this.expressionEvaluator = expressionEvaluator;
this.decompressionProcessingErrors = pluginMetrics.counter(DECOMPRESSION_PROCESSING_ERRORS);

if (decompressProcessorConfig.getDecompressWhen() != null
&& !expressionEvaluator.isValidExpressionStatement(decompressProcessorConfig.getDecompressWhen())) {
throw new InvalidPluginConfigurationException(
String.format("decompress_when value of %s is not a valid expression statement. " +
"See https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/ for valid expression syntax.", decompressProcessorConfig.getDecompressWhen()));
}
}

@Override
Expand All @@ -66,10 +72,10 @@ public Collection<Record<Event>> doExecute(final Collection<Record<Event>> recor

final byte[] compressedValueAsBytes = decompressProcessorConfig.getEncodingType().getDecoderEngine().decode(compressedValue);

try (final InputStream inputStream = decompressProcessorConfig.getDecompressionType().getDecompressionEngine().createInputStream(new ByteArrayInputStream(compressedValueAsBytes));
final BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream, Charsets.UTF_8))
){
record.getData().put(key, getDecompressedString(bufferedReader));
try (final InputStream inputStream = decompressProcessorConfig.getDecompressionType().getDecompressionEngine().createInputStream(new ByteArrayInputStream(compressedValueAsBytes));){

final String decompressedString = IOUtils.toString(inputStream, Charsets.UTF_8);
record.getData().put(key, decompressedString);
} catch (final Exception e) {
LOG.error("Unable to decompress key {} using decompression type {}:",
key, decompressProcessorConfig.getDecompressionType(), e);
Expand Down Expand Up @@ -105,15 +111,4 @@ public boolean isReadyForShutdown() {
public void shutdown() {

}

private String getDecompressedString(final BufferedReader bufferedReader) throws IOException {
final StringBuilder stringBuilder = new StringBuilder();
String line;

while ((line = bufferedReader.readLine()) != null) {
stringBuilder.append(line);
}

return stringBuilder.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
import org.opensearch.dataprepper.plugins.processor.decompress.encoding.EncodingType;
import org.opensearch.dataprepper.plugins.processor.decompress.encoding.IEncodingType;
import org.opensearch.dataprepper.plugins.processor.decompress.encoding.DecoderEngineFactory;

import java.util.List;

Expand All @@ -37,11 +37,11 @@ public List<String> getKeys() {
return keys;
}

public IDecompressionType getDecompressionType() {
public DecompressionEngineFactory getDecompressionType() {
return decompressionType;
}

public IEncodingType getEncodingType() { return encodingType; }
public DecoderEngineFactory getEncodingType() { return encodingType; }

public String getDecompressWhen() {
return decompressWhen;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@

import org.opensearch.dataprepper.model.codec.DecompressionEngine;

public interface IDecompressionType {
public interface DecompressionEngineFactory {
public DecompressionEngine getDecompressionEngine();
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import java.util.Map;
import java.util.stream.Collectors;

public enum DecompressionType implements IDecompressionType {
public enum DecompressionType implements DecompressionEngineFactory {
GZIP("gzip");

private final String option;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@

package org.opensearch.dataprepper.plugins.processor.decompress.encoding;

public interface IEncodingType {
public interface DecoderEngineFactory {
DecoderEngine getDecoderEngine();
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import java.util.Map;
import java.util.stream.Collectors;

public enum EncodingType implements IEncodingType {
public enum EncodingType implements DecoderEngineFactory {
BASE64("base64");

private final String option;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import static org.opensearch.dataprepper.plugins.processor.decompress.DecompressProcessorTest.buildRecordWithEvent;

@ExtendWith(MockitoExtension.class)
public class ITDecompressProcessorTest {
public class DecompressProcessorIT {

private List<String> keys;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@
import org.opensearch.dataprepper.model.codec.DecompressionEngine;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.processor.decompress.encoding.DecoderEngine;
import org.opensearch.dataprepper.plugins.processor.decompress.encoding.IEncodingType;
import org.opensearch.dataprepper.plugins.processor.decompress.encoding.DecoderEngineFactory;
import org.opensearch.dataprepper.plugins.processor.decompress.exceptions.DecodingException;

import java.io.ByteArrayInputStream;
Expand All @@ -33,6 +34,7 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -65,10 +67,10 @@ public class DecompressProcessorTest {
private DecompressProcessorConfig decompressProcessorConfig;

@Mock
private IDecompressionType decompressionType;
private DecompressionEngineFactory decompressionType;

@Mock
private IEncodingType encodingType;
private DecoderEngineFactory encodingType;

private DecompressProcessor createObjectUnderTest() {
return new DecompressProcessor(pluginMetrics, decompressProcessorConfig, expressionEvaluator);
Expand Down Expand Up @@ -180,6 +182,7 @@ void exception_from_expression_evaluator_adds_tags_and_increments_error_metric()

when(decompressProcessorConfig.getTagsOnFailure()).thenReturn(List.of(tagForFailure));
when(decompressProcessorConfig.getDecompressWhen()).thenReturn(decompressWhen);
when(expressionEvaluator.isValidExpressionStatement(decompressWhen)).thenReturn(true);
when(expressionEvaluator.evaluateConditional(eq(decompressWhen), any(Event.class)))
.thenThrow(RuntimeException.class);

Expand All @@ -201,6 +204,16 @@ void exception_from_expression_evaluator_adds_tags_and_increments_error_metric()
verify(decompressionProcessingErrors).increment();
}

@Test
void invalid_expression_statement_throws_InvalidPluginConfigurationException() {

final String decompressWhen = UUID.randomUUID().toString();
when(decompressProcessorConfig.getDecompressWhen()).thenReturn(decompressWhen);
when(expressionEvaluator.isValidExpressionStatement(decompressWhen)).thenReturn(false);

assertThrows(InvalidPluginConfigurationException.class, this::createObjectUnderTest);
}

static Record<Event> buildRecordWithEvent(final Map<String, Object> data) {
return new Record<>(JacksonEvent.builder()
.withData(data)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.processor.decompress;

import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.ArgumentsProvider;
import org.junit.jupiter.params.provider.ArgumentsSource;
import org.opensearch.dataprepper.model.codec.DecompressionEngine;
import org.opensearch.dataprepper.plugins.codec.GZipDecompressionEngine;

import java.util.stream.Stream;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.jupiter.params.provider.Arguments.arguments;

public class DecompressionTypeTest {

@ParameterizedTest
@ArgumentsSource(EnumToStringNameArgumentsProvider.class)
void fromOptionValue_returns_expected_DecompressionType(final DecompressionType expectedEnumValue, final String enumName) {
assertThat(DecompressionType.fromOptionValue(enumName), equalTo(expectedEnumValue));
}

@ParameterizedTest
@ArgumentsSource(EnumToDecompressionEngineClassArgumentsProvider.class)
void getDecompressionEngine_returns_expected_DecompressionEngine(final DecompressionType enumValue, final Class<DecompressionEngine> decompressionEngineClass) {
assertThat(enumValue.getDecompressionEngine(), instanceOf(decompressionEngineClass));
}

private static class EnumToStringNameArgumentsProvider implements ArgumentsProvider {
@Override
public Stream<? extends Arguments> provideArguments(final ExtensionContext context) {
return Stream.of(
arguments(DecompressionType.GZIP, "gzip")
);
}
}

private static class EnumToDecompressionEngineClassArgumentsProvider implements ArgumentsProvider {
@Override
public Stream<? extends Arguments> provideArguments(final ExtensionContext context) {
return Stream.of(
arguments(DecompressionType.GZIP, GZipDecompressionEngine.class)
);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.processor.decompress.encoding;

import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.ArgumentsProvider;
import org.junit.jupiter.params.provider.ArgumentsSource;

import java.util.stream.Stream;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.jupiter.params.provider.Arguments.arguments;

public class EncodingTypeTest {

@ParameterizedTest
@ArgumentsSource(EnumToStringNameArgumentsProvider.class)
void fromOptionValue_returns_expected_DecompressionType(final EncodingType expectedEnumValue, final String enumName) {
assertThat(EncodingType.fromOptionValue(enumName), equalTo(expectedEnumValue));
}

@ParameterizedTest
@ArgumentsSource(EnumToDecoderEngineClassArgumentsProvider.class)
void getDecompressionEngine_returns_expected_DecompressionEngine(final EncodingType enumValue, final Class<DecoderEngine> decoderEngineClass) {
assertThat(enumValue.getDecoderEngine(), instanceOf(decoderEngineClass));
}

private static class EnumToStringNameArgumentsProvider implements ArgumentsProvider {
@Override
public Stream<? extends Arguments> provideArguments(final ExtensionContext context) {
return Stream.of(
arguments(EncodingType.BASE64, "base64")
);
}
}

private static class EnumToDecoderEngineClassArgumentsProvider implements ArgumentsProvider {
@Override
public Stream<? extends Arguments> provideArguments(final ExtensionContext context) {
return Stream.of(
arguments(EncodingType.BASE64, Base64DecoderEngine.class)
);
}
}
}

0 comments on commit 3e06739

Please sign in to comment.