Skip to content

Commit

Permalink
Create decrompress processor to decompress gzipped keys
Browse files Browse the repository at this point in the history
Signed-off-by: Taylor Gray <tylgry@amazon.com>
  • Loading branch information
graytaylor0 committed Feb 14, 2024
1 parent a8024e9 commit 5f518fc
Show file tree
Hide file tree
Showing 14 changed files with 676 additions and 0 deletions.
16 changes: 16 additions & 0 deletions data-prepper-plugins/decompress-processor/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

plugins {
id 'java'
}

dependencies {
implementation project(':data-prepper-api')
implementation project(':data-prepper-plugins:common')
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'io.micrometer:micrometer-core'
testImplementation testLibs.mockito.inline
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.processor.decompress;

import com.google.common.base.Charsets;
import io.micrometer.core.instrument.Counter;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.processor.AbstractProcessor;
import org.opensearch.dataprepper.model.processor.Processor;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.processor.decompress.exceptions.DecodingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.Collection;

@DataPrepperPlugin(name = "decompress", pluginType = Processor.class, pluginConfigurationType = DecompressProcessorConfig.class)
public class DecompressProcessor extends AbstractProcessor<Record<Event>, Record<Event>> {

private static final Logger LOG = LoggerFactory.getLogger(DecompressProcessor.class);
static final String DECOMPRESSION_PROCESSING_ERRORS = "decompressionProcessingErrors";

private final DecompressProcessorConfig decompressProcessorConfig;
private final ExpressionEvaluator expressionEvaluator;

private final Counter decompressionProcessingErrors;

@DataPrepperPluginConstructor
public DecompressProcessor(final PluginMetrics pluginMetrics,
final DecompressProcessorConfig decompressProcessorConfig,
final ExpressionEvaluator expressionEvaluator) {
super(pluginMetrics);
this.decompressProcessorConfig = decompressProcessorConfig;
this.expressionEvaluator = expressionEvaluator;
this.decompressionProcessingErrors = pluginMetrics.counter(DECOMPRESSION_PROCESSING_ERRORS);
}

@Override
public Collection<Record<Event>> doExecute(final Collection<Record<Event>> records) {
for (final Record<Event> record : records) {

try {
if (decompressProcessorConfig.getDecompressWhen() != null && !expressionEvaluator.evaluateConditional(decompressProcessorConfig.getDecompressWhen(), record.getData())) {
continue;
}

for (final String key : decompressProcessorConfig.getKeys()) {

final String compressedValue = record.getData().get(key, String.class);

if (compressedValue == null) {
continue;
}

final byte[] compressedValueAsBytes = decompressProcessorConfig.getEncodingType().getDecoderEngine().decode(compressedValue);

try (final InputStream inputStream = decompressProcessorConfig.getDecompressionType().getDecompressionEngine().createInputStream(new ByteArrayInputStream(compressedValueAsBytes));
final BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream, Charsets.UTF_8))
){
record.getData().put(key, getDecompressedString(bufferedReader));
} catch (final Exception e) {
LOG.error("Unable to decompress key {} using decompression type {}:",
key, decompressProcessorConfig.getDecompressionType(), e);
record.getData().getMetadata().addTags(decompressProcessorConfig.getTagsOnFailure());
decompressionProcessingErrors.increment();
}
}
} catch (final DecodingException e) {
LOG.error("Unable to decode key with base64: {}", e.getMessage());
record.getData().getMetadata().addTags(decompressProcessorConfig.getTagsOnFailure());
decompressionProcessingErrors.increment();
} catch (final Exception e) {
LOG.error("An uncaught exception occurred while decompressing Events", e);
record.getData().getMetadata().addTags(decompressProcessorConfig.getTagsOnFailure());
decompressionProcessingErrors.increment();
}
}

return records;
}

@Override
public void prepareForShutdown() {

}

@Override
public boolean isReadyForShutdown() {
return true;
}

@Override
public void shutdown() {

}

private String getDecompressedString(final BufferedReader bufferedReader) throws IOException {
final StringBuilder stringBuilder = new StringBuilder();
String line;

while ((line = bufferedReader.readLine()) != null) {
stringBuilder.append(line);
}

return stringBuilder.toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.processor.decompress;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
import org.opensearch.dataprepper.plugins.processor.decompress.encoding.EncodingType;
import org.opensearch.dataprepper.plugins.processor.decompress.encoding.IEncodingType;

import java.util.List;

public class DecompressProcessorConfig {

@JsonProperty("keys")
@NotEmpty
private List<String> keys;

@JsonProperty("type")
@NotNull
private DecompressionType decompressionType;

@JsonProperty("decompress_when")
private String decompressWhen;

@JsonProperty("tags_on_failure")
private List<String> tagsOnFailure = List.of("_decompression_failure");

@JsonIgnore
private final EncodingType encodingType = EncodingType.BASE64;

public List<String> getKeys() {
return keys;
}

public IDecompressionType getDecompressionType() {
return decompressionType;
}

public IEncodingType getEncodingType() { return encodingType; }

public String getDecompressWhen() {
return decompressWhen;
}

public List<String> getTagsOnFailure() {
return tagsOnFailure;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.processor.decompress;

import com.fasterxml.jackson.annotation.JsonCreator;
import org.opensearch.dataprepper.model.codec.DecompressionEngine;
import org.opensearch.dataprepper.plugins.codec.GZipDecompressionEngine;

import java.util.Arrays;
import java.util.Map;
import java.util.stream.Collectors;

public enum DecompressionType implements IDecompressionType {
GZIP("gzip");

private final String option;

private static final Map<String, DecompressionType> OPTIONS_MAP = Arrays.stream(DecompressionType.values())
.collect(Collectors.toMap(
value -> value.option,
value -> value
));

private static final Map<String, DecompressionEngine> DECOMPRESSION_ENGINE_MAP = Map.of(
"gzip", new GZipDecompressionEngine()
);

DecompressionType(final String option) {
this.option = option;
}

@JsonCreator
static DecompressionType fromOptionValue(final String option) {
return OPTIONS_MAP.get(option);
}

@Override
public DecompressionEngine getDecompressionEngine() {
return DECOMPRESSION_ENGINE_MAP.get(this.option);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.processor.decompress;

import org.opensearch.dataprepper.model.codec.DecompressionEngine;

public interface IDecompressionType {
public DecompressionEngine getDecompressionEngine();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.processor.decompress.encoding;

import org.opensearch.dataprepper.plugins.processor.decompress.exceptions.DecodingException;

import java.util.Base64;

public class Base64DecoderEngine implements DecoderEngine {
@Override
public byte[] decode(final String encodedValue) {
try {
return Base64.getDecoder().decode(encodedValue);
} catch (final Exception e) {
throw new DecodingException(String.format("There was an error decoding with the base64 encoding type: %s", e.getMessage()));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.processor.decompress.encoding;

import org.opensearch.dataprepper.plugins.processor.decompress.exceptions.DecodingException;

public interface DecoderEngine {
byte[] decode(final String encodedValue) throws DecodingException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.processor.decompress.encoding;

import com.fasterxml.jackson.annotation.JsonCreator;

import java.util.Arrays;
import java.util.Map;
import java.util.stream.Collectors;

public enum EncodingType implements IEncodingType {
BASE64("base64");

private final String option;

private static final Map<String, EncodingType> OPTIONS_MAP = Arrays.stream(EncodingType.values())
.collect(Collectors.toMap(
value -> value.option,
value -> value
));

private static final Map<String, DecoderEngine> DECODER_ENGINE_MAP = Map.of(
"base64", new Base64DecoderEngine()
);

EncodingType(final String option) {
this.option = option;
}

@JsonCreator
static EncodingType fromOptionValue(final String option) {
return OPTIONS_MAP.get(option);
}

@Override
public DecoderEngine getDecoderEngine() {
return DECODER_ENGINE_MAP.get(this.option);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.processor.decompress.encoding;

public interface IEncodingType {
DecoderEngine getDecoderEngine();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.processor.decompress.exceptions;

public class DecodingException extends RuntimeException {
public DecodingException(final String message) {
super(message);
}
}
Loading

0 comments on commit 5f518fc

Please sign in to comment.