diff --git a/TRIAGING.md b/TRIAGING.md index a4a25e1932..ba20857061 100644 --- a/TRIAGING.md +++ b/TRIAGING.md @@ -19,7 +19,7 @@ However, should we run out of time before your issue is discussed, you are alway Meetings are hosted regularly Tuesdays at 2:30 PM US Central Time (12:30 PM Pacific Time) and can be joined via the links posted on the [OpenSearch Meetup Group](https://www.meetup.com/opensearch/events/) list of events. The event will be titled `Data Prepper Triage Meeting`. -After joining the Zoom meeting, you can enable your video / voice to join the discussion. +After joining the video meeting, you can enable your video / voice to join the discussion. If you do not have a webcam or microphone available, you can still join in via the text chat. If you have an issue you'd like to bring forth please consider getting a link to the issue so it can be presented to everyone in the meeting. diff --git a/build.gradle b/build.gradle index f77ecc442b..3dccd497cf 100644 --- a/build.gradle +++ b/build.gradle @@ -226,6 +226,9 @@ subprojects { test { useJUnitPlatform() + javaLauncher = javaToolchains.launcherFor { + languageVersion = JavaLanguageVersion.current() + } reports { junitXml.required html.required diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/AbstractSink.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/AbstractSink.java index 1c3e596265..26dd7e98a6 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/AbstractSink.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/AbstractSink.java @@ -28,6 +28,7 @@ public abstract class AbstractSink> implements Sink { private Thread retryThread; private int maxRetries; private int waitTimeMs; + private SinkThread sinkThread; public AbstractSink(final PluginSetting pluginSetting, int numRetries, int waitTimeMs) { this.pluginMetrics = PluginMetrics.fromPluginSetting(pluginSetting); @@ -51,7 +52,8 @@ public void initialize() { // the exceptions which are not retryable. doInitialize(); if (!isReady() && retryThread == null) { - retryThread = new Thread(new SinkThread(this, maxRetries, waitTimeMs)); + sinkThread = new SinkThread(this, maxRetries, waitTimeMs); + retryThread = new Thread(sinkThread); retryThread.start(); } } @@ -76,7 +78,7 @@ public void output(Collection records) { @Override public void shutdown() { if (retryThread != null) { - retryThread.stop(); + sinkThread.stop(); } } diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/SinkThread.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/SinkThread.java index c304de37af..451cef7dff 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/SinkThread.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/SinkThread.java @@ -10,6 +10,8 @@ class SinkThread implements Runnable { private int maxRetries; private int waitTimeMs; + private volatile boolean isStopped = false; + public SinkThread(AbstractSink sink, int maxRetries, int waitTimeMs) { this.sink = sink; this.maxRetries = maxRetries; @@ -19,11 +21,15 @@ public SinkThread(AbstractSink sink, int maxRetries, int waitTimeMs) { @Override public void run() { int numRetries = 0; - while (!sink.isReady() && numRetries++ < maxRetries) { + while (!sink.isReady() && numRetries++ < maxRetries && !isStopped) { try { Thread.sleep(waitTimeMs); sink.doInitialize(); } catch (InterruptedException e){} } } + + public void stop() { + isStopped = true; + } } diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/AbstractSinkTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/AbstractSinkTest.java index 3b9fe7c007..8d1af7ea44 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/AbstractSinkTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/AbstractSinkTest.java @@ -11,15 +11,10 @@ import org.opensearch.dataprepper.metrics.MetricNames; import org.opensearch.dataprepper.metrics.MetricsTestUtil; import org.opensearch.dataprepper.model.configuration.PluginSetting; -import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.event.Event; -import org.opensearch.dataprepper.model.event.JacksonEvent; import org.opensearch.dataprepper.model.event.EventHandle; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.Mockito.when; -import static org.mockito.Mockito.mock; +import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.record.Record; import java.time.Duration; import java.util.Arrays; @@ -30,6 +25,12 @@ import java.util.UUID; import static org.awaitility.Awaitility.await; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; class AbstractSinkTest { private int count; @@ -71,13 +72,13 @@ void testMetrics() { } @Test - void testSinkNotReady() { + void testSinkNotReady() throws InterruptedException { final String sinkName = "testSink"; final String pipelineName = "pipelineName"; MetricsTestUtil.initMetrics(); PluginSetting pluginSetting = new PluginSetting(sinkName, Collections.emptyMap()); pluginSetting.setPipelineName(pipelineName); - AbstractSink> abstractSink = new AbstractSinkNotReadyImpl(pluginSetting); + AbstractSinkNotReadyImpl abstractSink = new AbstractSinkNotReadyImpl(pluginSetting); abstractSink.initialize(); assertEquals(abstractSink.isReady(), false); assertEquals(abstractSink.getRetryThreadState(), Thread.State.RUNNABLE); @@ -87,7 +88,10 @@ void testSinkNotReady() { await().atMost(Duration.ofSeconds(5)) .until(abstractSink::isReady); assertEquals(abstractSink.getRetryThreadState(), Thread.State.TERMINATED); + int initCountBeforeShutdown = abstractSink.initCount; abstractSink.shutdown(); + Thread.sleep(200); + assertThat(abstractSink.initCount, equalTo(initCountBeforeShutdown)); } @Test diff --git a/data-prepper-core/build.gradle b/data-prepper-core/build.gradle index 080538c5e4..c939129a1c 100644 --- a/data-prepper-core/build.gradle +++ b/data-prepper-core/build.gradle @@ -48,7 +48,6 @@ dependencies { exclude group: 'commons-logging', module: 'commons-logging' } implementation 'software.amazon.cloudwatchlogs:aws-embedded-metrics:2.0.0-beta-1' - testImplementation 'org.apache.logging.log4j:log4j-jpl:2.23.0' testImplementation testLibs.spring.test implementation libs.armeria.core implementation libs.armeria.grpc @@ -89,8 +88,6 @@ task integrationTest(type: Test) { classpath = sourceSets.integrationTest.runtimeClasspath - systemProperty 'log4j.configurationFile', 'src/test/resources/log4j2.properties' - filter { includeTestsMatching '*IT' } diff --git a/data-prepper-plugins/avro-codecs/src/test/java/org/opensearch/dataprepper/avro/AvroAutoSchemaGeneratorTest.java b/data-prepper-plugins/avro-codecs/src/test/java/org/opensearch/dataprepper/avro/AvroAutoSchemaGeneratorTest.java index 622eb56a1b..1b66b62c37 100644 --- a/data-prepper-plugins/avro-codecs/src/test/java/org/opensearch/dataprepper/avro/AvroAutoSchemaGeneratorTest.java +++ b/data-prepper-plugins/avro-codecs/src/test/java/org/opensearch/dataprepper/avro/AvroAutoSchemaGeneratorTest.java @@ -17,7 +17,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Random; +import java.util.Timer; import java.util.UUID; import java.util.stream.Stream; @@ -218,7 +218,7 @@ static class SomeUnknownTypesArgumentsProvider implements ArgumentsProvider { @Override public Stream provideArguments(ExtensionContext context) { return Stream.of( - arguments(Random.class), + arguments(Timer.class), arguments(InputStream.class), arguments(File.class) ); diff --git a/data-prepper-plugins/blocking-buffer/src/test/java/org/opensearch/dataprepper/plugins/buffer/blockingbuffer/BlockingBufferTests.java b/data-prepper-plugins/blocking-buffer/src/test/java/org/opensearch/dataprepper/plugins/buffer/blockingbuffer/BlockingBufferTests.java index 194c810ec4..f3f28db174 100644 --- a/data-prepper-plugins/blocking-buffer/src/test/java/org/opensearch/dataprepper/plugins/buffer/blockingbuffer/BlockingBufferTests.java +++ b/data-prepper-plugins/blocking-buffer/src/test/java/org/opensearch/dataprepper/plugins/buffer/blockingbuffer/BlockingBufferTests.java @@ -328,7 +328,7 @@ public Stream provideArguments(final ExtensionContext conte return Stream.of( Arguments.of(0, randomInt + 1, 0.0), Arguments.of(1, 100, 1.0), - Arguments.of(randomInt, randomInt, 100.0), + Arguments.of(randomInt + 1, randomInt + 1, 100.0), Arguments.of(randomInt, randomInt + 250, ((double) randomInt / (randomInt + 250)) * 100), Arguments.of(6, 9, 66.66666666666666), Arguments.of(531, 1000, 53.1), diff --git a/data-prepper-plugins/event-json-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputCodecTest.java b/data-prepper-plugins/event-json-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputCodecTest.java index f85d1c6605..a4b0377963 100644 --- a/data-prepper-plugins/event-json-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputCodecTest.java +++ b/data-prepper-plugins/event-json-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputCodecTest.java @@ -11,9 +11,12 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; + import static org.mockito.Mockito.when; import static org.mockito.Mockito.mock; + import org.mockito.Mock; + import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.CoreMatchers.not; @@ -28,6 +31,7 @@ import java.io.ByteArrayInputStream; import java.time.Instant; +import java.time.temporal.ChronoUnit; import java.util.List; import java.util.LinkedList; import java.util.Map; @@ -56,7 +60,7 @@ public EventJsonInputCodec createInputCodec() { @ParameterizedTest @ValueSource(strings = {"", "{}"}) public void emptyTest(String input) throws Exception { - input = "{\""+EventJsonDefines.VERSION+"\":\""+DataPrepperVersion.getCurrentVersion().toString()+"\", \""+EventJsonDefines.EVENTS+"\":["+input+"]}"; + input = "{\"" + EventJsonDefines.VERSION + "\":\"" + DataPrepperVersion.getCurrentVersion().toString() + "\", \"" + EventJsonDefines.EVENTS + "\":[" + input + "]}"; ByteArrayInputStream inputStream = new ByteArrayInputStream(input.getBytes()); inputCodec = createInputCodec(); Consumer> consumer = mock(Consumer.class); @@ -70,15 +74,15 @@ public void inCompatibleVersionTest() throws Exception { final String key = UUID.randomUUID().toString(); final String value = UUID.randomUUID().toString(); Map data = Map.of(key, value); - Instant startTime = Instant.now(); + Instant startTime = Instant.now().truncatedTo(ChronoUnit.MICROS); Event event = createEvent(data, startTime); Map dataMap = event.toMap(); Map metadataMap = objectMapper.convertValue(event.getMetadata(), Map.class); - String input = "{\""+EventJsonDefines.VERSION+"\":\"3.0\", \""+EventJsonDefines.EVENTS+"\":["; + String input = "{\"" + EventJsonDefines.VERSION + "\":\"3.0\", \"" + EventJsonDefines.EVENTS + "\":["; String comma = ""; for (int i = 0; i < 2; i++) { - input += comma+"{\"data\":"+objectMapper.writeValueAsString(dataMap)+","+"\"metadata\":"+objectMapper.writeValueAsString(metadataMap)+"}"; + input += comma + "{\"data\":" + objectMapper.writeValueAsString(dataMap) + "," + "\"metadata\":" + objectMapper.writeValueAsString(metadataMap) + "}"; comma = ","; } input += "]}"; @@ -95,15 +99,15 @@ public void basicTest() throws Exception { final String key = UUID.randomUUID().toString(); final String value = UUID.randomUUID().toString(); Map data = Map.of(key, value); - Instant startTime = Instant.now(); + Instant startTime = Instant.now().truncatedTo(ChronoUnit.MICROS); Event event = createEvent(data, startTime); Map dataMap = event.toMap(); Map metadataMap = objectMapper.convertValue(event.getMetadata(), Map.class); - String input = "{\""+EventJsonDefines.VERSION+"\":\""+DataPrepperVersion.getCurrentVersion().toString()+"\", \""+EventJsonDefines.EVENTS+"\":["; + String input = "{\"" + EventJsonDefines.VERSION + "\":\"" + DataPrepperVersion.getCurrentVersion().toString() + "\", \"" + EventJsonDefines.EVENTS + "\":["; String comma = ""; for (int i = 0; i < 2; i++) { - input += comma+"{\"data\":"+objectMapper.writeValueAsString(dataMap)+","+"\"metadata\":"+objectMapper.writeValueAsString(metadataMap)+"}"; + input += comma + "{\"data\":" + objectMapper.writeValueAsString(dataMap) + "," + "\"metadata\":" + objectMapper.writeValueAsString(metadataMap) + "}"; comma = ","; } input += "]}"; @@ -111,8 +115,8 @@ public void basicTest() throws Exception { List> records = new LinkedList<>(); inputCodec.parse(inputStream, records::add); assertThat(records.size(), equalTo(2)); - for(Record record : records) { - Event e = (Event)record.getData(); + for (Record record : records) { + Event e = (Event) record.getData(); assertThat(e.get(key, String.class), equalTo(value)); assertThat(e.getMetadata().getTimeReceived(), equalTo(startTime)); assertThat(e.getMetadata().getTags().size(), equalTo(0)); @@ -126,15 +130,15 @@ public void test_with_timeReceivedOverridden() throws Exception { final String key = UUID.randomUUID().toString(); final String value = UUID.randomUUID().toString(); Map data = Map.of(key, value); - Instant startTime = Instant.now().minusSeconds(5); + Instant startTime = Instant.now().truncatedTo(ChronoUnit.MICROS).minusSeconds(5); Event event = createEvent(data, startTime); Map dataMap = event.toMap(); Map metadataMap = objectMapper.convertValue(event.getMetadata(), Map.class); - String input = "{\""+EventJsonDefines.VERSION+"\":\""+DataPrepperVersion.getCurrentVersion().toString()+"\", \""+EventJsonDefines.EVENTS+"\":["; + String input = "{\"" + EventJsonDefines.VERSION + "\":\"" + DataPrepperVersion.getCurrentVersion().toString() + "\", \"" + EventJsonDefines.EVENTS + "\":["; String comma = ""; for (int i = 0; i < 2; i++) { - input += comma+"{\"data\":"+objectMapper.writeValueAsString(dataMap)+","+"\"metadata\":"+objectMapper.writeValueAsString(metadataMap)+"}"; + input += comma + "{\"data\":" + objectMapper.writeValueAsString(dataMap) + "," + "\"metadata\":" + objectMapper.writeValueAsString(metadataMap) + "}"; comma = ","; } input += "]}"; @@ -142,8 +146,8 @@ public void test_with_timeReceivedOverridden() throws Exception { List> records = new LinkedList<>(); inputCodec.parse(inputStream, records::add); assertThat(records.size(), equalTo(2)); - for(Record record : records) { - Event e = (Event)record.getData(); + for (Record record : records) { + Event e = (Event) record.getData(); assertThat(e.get(key, String.class), equalTo(value)); assertThat(e.getMetadata().getTimeReceived(), not(equalTo(startTime))); assertThat(e.getMetadata().getTags().size(), equalTo(0)); @@ -159,7 +163,7 @@ private Event createEvent(final Map json, final Instant timeRece if (timeReceived != null) { logBuilder.withTimeReceived(timeReceived); } - final JacksonEvent event = (JacksonEvent)logBuilder.build(); + final JacksonEvent event = (JacksonEvent) logBuilder.build(); return event; } diff --git a/data-prepper-plugins/event-json-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputOutputCodecTest.java b/data-prepper-plugins/event-json-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputOutputCodecTest.java index 85e91e5a55..7ea8c49cd0 100644 --- a/data-prepper-plugins/event-json-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputOutputCodecTest.java +++ b/data-prepper-plugins/event-json-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputOutputCodecTest.java @@ -6,9 +6,12 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; + import static org.mockito.Mockito.when; import static org.mockito.Mockito.mock; + import org.mockito.Mock; + import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; @@ -22,6 +25,7 @@ import org.opensearch.dataprepper.model.log.JacksonLog; import java.time.Instant; +import java.time.temporal.ChronoUnit; import java.util.List; import java.util.LinkedList; import java.util.Map; @@ -64,7 +68,7 @@ public void basicTest() throws Exception { final String value = UUID.randomUUID().toString(); Map data = Map.of(key, value); - Instant startTime = Instant.now(); + Instant startTime = Instant.now().truncatedTo(ChronoUnit.MICROS); Event event = createEvent(data, startTime); outputCodec = createOutputCodec(); inputCodec = createInputCodec(); @@ -75,8 +79,8 @@ public void basicTest() throws Exception { inputCodec.parse(new ByteArrayInputStream(outputStream.toByteArray()), records::add); assertThat(records.size(), equalTo(1)); - for(Record record : records) { - Event e = (Event)record.getData(); + for (Record record : records) { + Event e = (Event) record.getData(); assertThat(e.get(key, String.class), equalTo(value)); assertThat(e.getMetadata().getTimeReceived(), equalTo(startTime)); assertThat(e.getMetadata().getTags().size(), equalTo(0)); @@ -90,7 +94,7 @@ public void multipleEventsTest() throws Exception { final String value = UUID.randomUUID().toString(); Map data = Map.of(key, value); - Instant startTime = Instant.now(); + Instant startTime = Instant.now().truncatedTo(ChronoUnit.MICROS); Event event = createEvent(data, startTime); outputCodec = createOutputCodec(); inputCodec = createInputCodec(); @@ -103,8 +107,8 @@ public void multipleEventsTest() throws Exception { inputCodec.parse(new ByteArrayInputStream(outputStream.toByteArray()), records::add); assertThat(records.size(), equalTo(3)); - for(Record record : records) { - Event e = (Event)record.getData(); + for (Record record : records) { + Event e = (Event) record.getData(); assertThat(e.get(key, String.class), equalTo(value)); assertThat(e.getMetadata().getTimeReceived(), equalTo(startTime)); assertThat(e.getMetadata().getTags().size(), equalTo(0)); @@ -122,7 +126,7 @@ public void extendedTest() throws Exception { Set tags = Set.of(UUID.randomUUID().toString(), UUID.randomUUID().toString()); List tagsList = tags.stream().collect(Collectors.toList()); - Instant startTime = Instant.now(); + Instant startTime = Instant.now().truncatedTo(ChronoUnit.MICROS); Event event = createEvent(data, startTime); Instant origTime = startTime.minusSeconds(5); event.getMetadata().setExternalOriginationTime(origTime); @@ -135,11 +139,11 @@ public void extendedTest() throws Exception { outputCodec.complete(outputStream); assertThat(outputCodec.getExtension(), equalTo(EventJsonOutputCodec.EVENT_JSON)); List> records = new LinkedList<>(); -inputCodec.parse(new ByteArrayInputStream(outputStream.toByteArray()), records::add); + inputCodec.parse(new ByteArrayInputStream(outputStream.toByteArray()), records::add); assertThat(records.size(), equalTo(1)); - for(Record record : records) { - Event e = (Event)record.getData(); + for (Record record : records) { + Event e = (Event) record.getData(); assertThat(e.get(key, String.class), equalTo(value)); assertThat(e.getMetadata().getTimeReceived(), equalTo(startTime)); assertThat(e.getMetadata().getTags(), equalTo(tags)); @@ -157,7 +161,7 @@ private Event createEvent(final Map json, final Instant timeRece if (timeReceived != null) { logBuilder.withTimeReceived(timeReceived); } - final JacksonEvent event = (JacksonEvent)logBuilder.build(); + final JacksonEvent event = (JacksonEvent) logBuilder.build(); return event; } diff --git a/data-prepper-plugins/event-json-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonOutputCodecTest.java b/data-prepper-plugins/event-json-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonOutputCodecTest.java index 51dda545cb..b32d2b62e9 100644 --- a/data-prepper-plugins/event-json-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonOutputCodecTest.java +++ b/data-prepper-plugins/event-json-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonOutputCodecTest.java @@ -11,6 +11,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.Mock; + import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; @@ -22,6 +23,7 @@ import org.opensearch.dataprepper.model.log.JacksonLog; import java.time.Instant; +import java.time.temporal.ChronoUnit; import java.util.Map; import java.util.UUID; @@ -49,7 +51,7 @@ public void basicTest() throws Exception { final String value = UUID.randomUUID().toString(); Map data = Map.of(key, value); - Instant startTime = Instant.now(); + Instant startTime = Instant.now().truncatedTo(ChronoUnit.MICROS); Event event = createEvent(data, startTime); outputCodec = createOutputCodec(); outputCodec.start(outputStream, null, null); @@ -59,10 +61,10 @@ public void basicTest() throws Exception { Map dataMap = event.toMap(); Map metadataMap = objectMapper.convertValue(event.getMetadata(), Map.class); //String expectedOutput = "{\"version\":\""+DataPrepperVersion.getCurrentVersion().toString()+"\",\""+EventJsonDefines.EVENTS+"\":["; - String expectedOutput = "{\""+EventJsonDefines.VERSION+"\":\""+DataPrepperVersion.getCurrentVersion().toString()+"\",\""+EventJsonDefines.EVENTS+"\":["; + String expectedOutput = "{\"" + EventJsonDefines.VERSION + "\":\"" + DataPrepperVersion.getCurrentVersion().toString() + "\",\"" + EventJsonDefines.EVENTS + "\":["; String comma = ""; for (int i = 0; i < 2; i++) { - expectedOutput += comma+"{\""+EventJsonDefines.DATA+"\":"+objectMapper.writeValueAsString(dataMap)+","+"\""+EventJsonDefines.METADATA+"\":"+objectMapper.writeValueAsString(metadataMap)+"}"; + expectedOutput += comma + "{\"" + EventJsonDefines.DATA + "\":" + objectMapper.writeValueAsString(dataMap) + "," + "\"" + EventJsonDefines.METADATA + "\":" + objectMapper.writeValueAsString(metadataMap) + "}"; comma = ","; } expectedOutput += "]}"; @@ -78,7 +80,7 @@ private Event createEvent(final Map json, final Instant timeRece if (timeReceived != null) { logBuilder.withTimeReceived(timeReceived); } - final JacksonEvent event = (JacksonEvent)logBuilder.build(); + final JacksonEvent event = (JacksonEvent) logBuilder.build(); return event; } diff --git a/data-prepper-plugins/obfuscate-processor/README.md b/data-prepper-plugins/obfuscate-processor/README.md index 8e48582cf1..842d08106d 100644 --- a/data-prepper-plugins/obfuscate-processor/README.md +++ b/data-prepper-plugins/obfuscate-processor/README.md @@ -64,7 +64,7 @@ Below are the list of configuration options. * `patterns` - (optional) - A list of Regex patterns. You can define multiple patterns for the same field. Only the parts that matched the Regex patterns to be obfuscated. If not provided, the full field will be obfuscated. * `single_word_only` - (optional) - When set to `true`, a word boundary `\b` is added to the pattern, due to which obfuscation would be applied only to words that are standalone in the input text. By default, it is `false`, meaning obfuscation patterns are applied to all occurrences. -* `action` - (optional) - Obfuscation action, default to `mask`. Currently, `mask` is the only supported action. +* `action` - (optional) - Obfuscation action, `mask` or `hash` to use one way hashing. Default to `mask` ### Configuration - Mask Action @@ -75,6 +75,57 @@ There are some additional configuration options for Mask action. * `mask_character_length` - (optional) - Default to 3. The value must be between 1 and 10. There will be n numbers of obfuscation characters, e.g. '***' +### Configuration - One Way Hash Action + +There are some additional configuration options for One Way Hash action. + +* `format` - (optional) - Default to SHA-512. Format of One Way Hash to use. +* `salt` - (optional) - Default to generate random salt. +* `salt_key` - (optional) - Instructs to generate salt for each record based on a value of a specified field in the message + +```yaml +pipeline: + source: + http: + processor: + - obfuscate: + source: "log" + target: "new_log" + patterns: + - "[A-Za-z0-9+_.-]+@([\\w-]+\\.)+[\\w-]{2,4}" + action: + hash: + salt_key: "/" + salt: "" + - obfuscate: + source: "phone" + action: + hash: + salt: "" + sink: + - stdout: +``` + +Take below input + +```json +{ + "id": 1, + "phone": "(555) 555 5555", + "log": "My name is Bob and my email address is abc@example.com" +} +``` + +When run, the processor will parse the message into the following output: + +```json +{ + "id": 1, + "phone": "***", + "log": "My name is Bob and my email address is ", + "newLog": "My name is Bob and my email address is " +} +``` --- ## FAQ: diff --git a/data-prepper-plugins/obfuscate-processor/build.gradle b/data-prepper-plugins/obfuscate-processor/build.gradle index 22909eecd9..83e21a5889 100644 --- a/data-prepper-plugins/obfuscate-processor/build.gradle +++ b/data-prepper-plugins/obfuscate-processor/build.gradle @@ -4,6 +4,7 @@ dependencies { implementation 'com.fasterxml.jackson.core:jackson-core' implementation 'com.fasterxml.jackson.core:jackson-databind' testImplementation project(':data-prepper-test-common') + testImplementation project(':data-prepper-test-event') } test { diff --git a/data-prepper-plugins/obfuscate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/obfuscation/ObfuscationProcessor.java b/data-prepper-plugins/obfuscate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/obfuscation/ObfuscationProcessor.java index bbb1a1600a..472ffec940 100644 --- a/data-prepper-plugins/obfuscate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/obfuscation/ObfuscationProcessor.java +++ b/data-prepper-plugins/obfuscate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/obfuscation/ObfuscationProcessor.java @@ -126,7 +126,7 @@ public Collection> doExecute(Collection> records) { String rawValue = recordEvent.get(source, String.class); // Call obfuscation action - String newValue = this.action.obfuscate(rawValue, patterns); + String newValue = this.action.obfuscate(rawValue, patterns, record); // No changes means it does not match any patterns if (rawValue.equals(newValue)) { diff --git a/data-prepper-plugins/obfuscate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/obfuscation/action/MaskAction.java b/data-prepper-plugins/obfuscate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/obfuscation/action/MaskAction.java index 45fc27fe27..2435156b5f 100644 --- a/data-prepper-plugins/obfuscate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/obfuscation/action/MaskAction.java +++ b/data-prepper-plugins/obfuscate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/obfuscation/action/MaskAction.java @@ -7,6 +7,8 @@ import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; import java.util.List; import java.util.regex.Pattern; @@ -21,7 +23,7 @@ public MaskAction(final MaskActionConfig config) { } @Override - public String obfuscate(String source, List patterns) { + public String obfuscate(String source, List patterns, Record record) { if (patterns == null || patterns.size() == 0) { // This is to replace the whole field. diff --git a/data-prepper-plugins/obfuscate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/obfuscation/action/ObfuscationAction.java b/data-prepper-plugins/obfuscate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/obfuscation/action/ObfuscationAction.java index 1a0376cb89..0e6b71e2aa 100644 --- a/data-prepper-plugins/obfuscate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/obfuscation/action/ObfuscationAction.java +++ b/data-prepper-plugins/obfuscate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/obfuscation/action/ObfuscationAction.java @@ -8,6 +8,9 @@ import java.util.List; import java.util.regex.Pattern; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; + /** * Interface represents a specific action to be taken for obfuscation. @@ -20,7 +23,8 @@ public interface ObfuscationAction { * * @param source source string * @param patterns a list of patterns to match + * @param record raw record * @return obfuscated string */ - String obfuscate(String source, List patterns); + String obfuscate(String source, List patterns, Record record); } diff --git a/data-prepper-plugins/obfuscate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/obfuscation/action/OneWayHashAction.java b/data-prepper-plugins/obfuscate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/obfuscation/action/OneWayHashAction.java new file mode 100644 index 0000000000..28e47eae08 --- /dev/null +++ b/data-prepper-plugins/obfuscate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/obfuscation/action/OneWayHashAction.java @@ -0,0 +1,142 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.processor.obfuscation.action; + +import java.nio.charset.StandardCharsets; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.security.SecureRandom; +import java.util.Arrays; +import java.util.Base64; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; +import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.plugins.processor.obfuscation.ObfuscationProcessor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.opensearch.dataprepper.model.event.EventKey; + +@DataPrepperPlugin(name = "hash", pluginType = ObfuscationAction.class, pluginConfigurationType = OneWayHashActionConfig.class) +public class OneWayHashAction implements ObfuscationAction { + + + private final MessageDigest messageDigest; + private final byte[] salt; + private EventKey saltKey; + private static final Logger LOG = LoggerFactory.getLogger(ObfuscationProcessor.class); + + @DataPrepperPluginConstructor + public OneWayHashAction(final OneWayHashActionConfig config) { + + this.saltKey = config.getSaltKey(); + + if (config.getSalt() == null || config.getSalt().isEmpty() ) { + this.salt = generateSalt(); + } else { + this.salt = config.getSalt().getBytes(StandardCharsets.UTF_8); + } + + try { + messageDigest = MessageDigest.getInstance(config.getFormat()); + } catch (NoSuchAlgorithmException noSuchAlgorithmException){ + LOG.error("The hash format provided ({}) is not a known algorithm [{}]", config.getFormat(), noSuchAlgorithmException); + throw new RuntimeException(noSuchAlgorithmException); + } + } + + @Override + public String obfuscate(String source, List patterns, Record record) { + + byte [] saltToApply = this.salt; + + // Resolve salt to compute based on a path provided in the configuration. + // For records where path was not found, the salt value defined in the pipeline configuration will be used, if salt value was not configured, one will be generated. + + if(saltKey != null && saltKey.equals("") == false) { + + final Event recordEvent = record.getData(); + + if (recordEvent.containsKey(saltKey)) { + + saltToApply = computeSaltBasedOnKeyValue(recordEvent.get(saltKey, String.class)); + } else { + LOG.info("Unable to find a key '{}' for using as salt, using default salt pipeline configuration for the record instead", saltKey); + } + } + + if (patterns == null || patterns.size() == 0) { + // no pattern to match, replace the whole string + return oneWayHashString(source,saltToApply); + } + + String replacementString = source; + + for (Pattern pattern : patterns) { + + Matcher matcher = Pattern.compile(pattern.pattern()).matcher(replacementString); + StringBuffer stringBuffer = new StringBuffer(); + + while (matcher.find()) { + + String stringToHash = replacementString.substring(matcher.start(),matcher.end()); + matcher.appendReplacement(stringBuffer, oneWayHashString(stringToHash,saltToApply)); + } + + matcher.appendTail(stringBuffer); + replacementString = stringBuffer.toString(); + } + return replacementString; + + + } + + private String oneWayHashString(String source, byte[] salt) { + + String oneWayHashedSource = ""; + + try { + MessageDigest messageDigestClone = (MessageDigest) messageDigest.clone(); + + messageDigestClone.update(salt); + byte[] bytes = messageDigestClone.digest(source.getBytes(StandardCharsets.UTF_8)); + + oneWayHashedSource = Base64.getEncoder().encodeToString(bytes); + + } catch (CloneNotSupportedException cloneNotSupportedException) { + LOG.error("There was an exception while processing Event [{}]", cloneNotSupportedException); + throw new RuntimeException(cloneNotSupportedException); + } + + return oneWayHashedSource; + } + + private byte [] computeSaltBasedOnKeyValue(String saltValue) { + + byte [] value = saltValue.getBytes(StandardCharsets.UTF_8); + byte [] result = new byte [64]; + + Arrays.fill(result, Byte.MIN_VALUE); + + System.arraycopy(value, 0, result, 0, + (value.length >= result.length) ? result.length : value.length); + + return result; + } + + private byte[] generateSalt() { + + byte [] saltBytes = new byte[64]; + SecureRandom secureRandom = new SecureRandom(); + secureRandom.nextBytes(saltBytes); + return saltBytes; + } +} diff --git a/data-prepper-plugins/obfuscate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/obfuscation/action/OneWayHashActionConfig.java b/data-prepper-plugins/obfuscate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/obfuscation/action/OneWayHashActionConfig.java new file mode 100644 index 0000000000..3e3ab622c5 --- /dev/null +++ b/data-prepper-plugins/obfuscate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/obfuscation/action/OneWayHashActionConfig.java @@ -0,0 +1,58 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.processor.obfuscation.action; + +import org.opensearch.dataprepper.model.event.EventKeyConfiguration; +import org.opensearch.dataprepper.model.event.EventKeyFactory; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonPropertyDescription; + +import jakarta.validation.constraints.Pattern; +import jakarta.validation.constraints.Size; + +import org.opensearch.dataprepper.model.event.EventKey; +import org.opensearch.dataprepper.model.event.EventKeyConfiguration; +import org.opensearch.dataprepper.model.event.EventKeyFactory; + + +public class OneWayHashActionConfig { + + @JsonProperty("salt") + @JsonPropertyDescription("Salt value to use when generating hash. If not specified, salt will be randomly generated by the processor.") + @Size(min = 16, message = "Minimum size of salt string is 16.") + @Size(max = 64, message = "Maximum size of salt string is 64") + private String salt; + + @JsonProperty("format") + @Pattern(regexp = "SHA-512", message = "Valid values: SHA-512") + @JsonPropertyDescription("Format of one way hash to generate. Default to SHA-512.") + private String format = "SHA-512"; + + @JsonProperty("salt_key") + @JsonPropertyDescription("A key to compute salt based on a value provided as part of a record." + + "If key or value was not found in the record(s), a salt defined in the pipeline configuration will be used instead.") + @EventKeyConfiguration(EventKeyFactory.EventAction.GET) + private EventKey saltKey; + + public OneWayHashActionConfig(){ + + } + + public String getSalt () { + return salt; + } + + public String getFormat() { + return format; + } + + public EventKey getSaltKey() { + return saltKey; + } + +} + diff --git a/data-prepper-plugins/obfuscate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/obfuscation/ObfuscationProcessorTest.java b/data-prepper-plugins/obfuscate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/obfuscation/ObfuscationProcessorTest.java index be35b2cf01..8e1f556110 100644 --- a/data-prepper-plugins/obfuscate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/obfuscation/ObfuscationProcessorTest.java +++ b/data-prepper-plugins/obfuscate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/obfuscation/ObfuscationProcessorTest.java @@ -140,11 +140,12 @@ void testBasicProcessor(String message) { void testProcessorWithDifferentAction() { final PluginModel mockModel = mock(PluginModel.class); final ObfuscationAction mockAction = mock(ObfuscationAction.class); + when(mockModel.getPluginName()).thenReturn("mock"); when(mockModel.getPluginSettings()).thenReturn(new HashMap<>()); when(mockConfig.getAction()).thenReturn(mockModel); when(mockConfig.getTarget()).thenReturn(""); - when(mockAction.obfuscate(anyString(), anyList())).thenReturn("abc"); + when(mockAction.obfuscate(anyString(), anyList(),any())).thenReturn("abc"); when(mockFactory.loadPlugin(eq(ObfuscationAction.class), any(PluginSetting.class))) .thenReturn(mockAction); diff --git a/data-prepper-plugins/obfuscate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/obfuscation/action/MaskActionTest.java b/data-prepper-plugins/obfuscate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/obfuscation/action/MaskActionTest.java index 3abedf5a61..ffcb336a01 100644 --- a/data-prepper-plugins/obfuscate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/obfuscation/action/MaskActionTest.java +++ b/data-prepper-plugins/obfuscate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/obfuscation/action/MaskActionTest.java @@ -16,7 +16,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; -class MaskActionTest { +class MaskActionTest implements ObfuscationActionTest { private MaskAction maskAction; @@ -25,7 +25,7 @@ class MaskActionTest { void testObfuscateWithPatternAsNull() { String message = "Hello"; maskAction = createMaskAction("*", 3); - String result = maskAction.obfuscate(message, null); + String result = maskAction.obfuscate(message, null, createRecord(message)); assertThat(result, equalTo("***")); } @@ -39,7 +39,7 @@ void testObfuscateWithPatternAsNull() { void testObfuscateWithDifferentConfig(String message, String maskCharacter, int maskCharacterLength, String expected) { maskAction = createMaskAction(maskCharacter, maskCharacterLength); List patterns = new ArrayList<>(); - String result = maskAction.obfuscate(message, patterns); + String result = maskAction.obfuscate(message, patterns,createRecord(message)); assertThat(result, equalTo(expected)); } diff --git a/data-prepper-plugins/obfuscate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/obfuscation/action/ObfuscationActionTest.java b/data-prepper-plugins/obfuscate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/obfuscation/action/ObfuscationActionTest.java new file mode 100644 index 0000000000..f43f3f6f99 --- /dev/null +++ b/data-prepper-plugins/obfuscate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/obfuscation/action/ObfuscationActionTest.java @@ -0,0 +1,24 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.processor.obfuscation.action; + +import java.util.HashMap; +import java.util.Map; + +import org.opensearch.dataprepper.event.TestEventFactory; +import org.opensearch.dataprepper.model.event.EventBuilder; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; + +interface ObfuscationActionTest { + + default Record createRecord(String message) { + final Map testData = new HashMap<>(); + testData.put("message", message); + + return new Record<>(TestEventFactory.getTestEventFactory().eventBuilder(EventBuilder.class).withEventType("event").withData(testData).build()); + } +} diff --git a/data-prepper-plugins/obfuscate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/obfuscation/action/OneWayHashActionTest.java b/data-prepper-plugins/obfuscate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/obfuscation/action/OneWayHashActionTest.java new file mode 100644 index 0000000000..8b974bed30 --- /dev/null +++ b/data-prepper-plugins/obfuscate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/obfuscation/action/OneWayHashActionTest.java @@ -0,0 +1,174 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.processor.obfuscation.action; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.provider.CsvSource; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import org.opensearch.dataprepper.model.event.EventKeyFactory; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.not; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.mockito.Mockito.when; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.regex.Pattern; + +import org.opensearch.dataprepper.event.TestEventKeyFactory; +import org.opensearch.dataprepper.model.event.EventKeyFactory; + +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +public class OneWayHashActionTest implements ObfuscationActionTest { + + @Mock + OneWayHashActionConfig mockConfig; + + private final EventKeyFactory eventKeyFactory = TestEventKeyFactory.getTestEventFactory(); + + @Test + void testObfuscateWithPatternAsNull() { + String message = "Hello"; + when(mockConfig.getSaltKey()).thenReturn(null); + when(mockConfig.getSalt()).thenReturn(""); + when(mockConfig.getFormat()).thenReturn("SHA-512"); + + String result = new OneWayHashAction(mockConfig).obfuscate(message, null,createRecord(message)); + assertNotNull(result); + + assertThat(result, not(containsString(message))); + assertThat(result.length(), equalTo(88)); + } + + + @ParameterizedTest + @CsvSource({ + "Hello,AAAAAAAAAAAAAAAA,2NYZBaQ9nySumhHENpiKatKJhU3jqHC8jJ4DZC612RPGvkzPK1K12DskOI8Cn3qeOMSCTNIWErcGZr8JV4i9HQ==", + "Hi,BBBBBBBBBBBBBBBB,s3S4lyurJvJpQJ6EHN3gi/kexv79Ox+nIqXuVdbvgZP0b718AAxX0bOCPLeOZCnq3p3+DS+a0q0xLSJoMqjsNQ==", + "Hello,CCCCCCCCCCCCCCCC,SsUUpl/+GtU7cRg3ffuRKAtPU7cftdN440sNKR+gABy6JV6crwn5VTNSIqGKaTgBcZeYICy2ZmxP1DiHcW31rA==", + "H,DDDDDDDDDDDDDDDD,XR6utNkOp9te4+0vaRE0+ky/Zyw/gok1sI8qR/stZqFPoU733KwFcur36FCTUZd+i/UpyyJ9L/W6ObwPIf7iuw==", + }) + void testObfuscateWithDifferentConfig(String message, String salt, String expected) { + + when(mockConfig.getSalt()).thenReturn(salt); + when(mockConfig.getSaltKey()).thenReturn(null); + when(mockConfig.getFormat()).thenReturn("SHA-512"); + + OneWayHashAction oneWayHashAction = new OneWayHashAction(mockConfig); + + List patterns = new ArrayList<>(); + String result = oneWayHashAction.obfuscate(message, patterns,createRecord(message)); + assertThat(result, equalTo(expected)); + } + + + @ParameterizedTest + @CsvSource({ + "testing this functionality, test, AAAAAAAAAAAAAAAA, ILsULwmg32tiEQGqeX1rpWI9PGZXSX2Q9tRzXCD0cD/OKMMEBEXKYZhnXj1Xr9q+Dxa11iOmuXd+hx4ZTUaBCg==ing this functionality", + "test this functionality, test, BBBBBBBBBBBBBBBB, QT4wuvJSvgrxa/27gf4cZ1jzeNyiOnDxsY0oS7SsC/eVpBNyhj2I8Rh6/wCsvqRyzAvVoksTKOuRzSFUm6vAQw== this functionality", + "another test of this functionality, test, CCCCCCCCCCCCCCCC, another H9YrqOIlLtaoSCkNR2M0go3npf118KbsHFemyvJUX4+zt8FvjoiReq/0pk5va5i+7eX6XTOMwNokUUl4r+PTHw== of this functionality", + "My name is Bob and my email address is abc@example.com as of now and xyz@example.org in the future, [A-Za-z0-9+_.-]+@([\\w-]+\\.)+[\\w-] ,DDDDDDDDDDDDDDDD, My name is Bob and my email address is DdijIn6L3Cs4+PCYwCy+3bzLZ7w228quoodeI+VDlyMeFe+uZ/Ec1x/DK7MHSmZm8N5SZrINhvGgyig7aEBflg==om as of now and XQGlFjysVX1lkTFoRVCY+QEOfOf6nCoaRy5lxGAHyaFRgMGDpq93PwgZd18DZ3ZfWFRCwgPDGaExJDuRa0kkEQ==rg in the future", + }) + void testObfuscateWithPatterns(String message, String pattern, String salt, String expected) { + + when(mockConfig.getSalt()).thenReturn(salt); + when(mockConfig.getFormat()).thenReturn("SHA-512"); + + OneWayHashAction oneWayHashAction = new OneWayHashAction(mockConfig); + + + Pattern compiledPattern = Pattern.compile(pattern); + List patterns = new ArrayList<>(); + patterns.add(compiledPattern); + String result = oneWayHashAction.obfuscate(message, patterns,createRecord(message)); + assertThat(result, equalTo(expected)); + } + + @ParameterizedTest + @CsvSource({ + "testing this functionality and this test, test, this, AAAAAAAAAAAAAAAA, ILsULwmg32tiEQGqeX1rpWI9PGZXSX2Q9tRzXCD0cD/OKMMEBEXKYZhnXj1Xr9q+Dxa11iOmuXd+hx4ZTUaBCg==ing VsljIdInUvEk2ShjqBF94jgwWDk1lqcE/Fmb/LACPRlwIKsdmlk2PPX2o0XHObp4kRDqd+gUU5iUa/4HXhaA8g== functionality and VsljIdInUvEk2ShjqBF94jgwWDk1lqcE/Fmb/LACPRlwIKsdmlk2PPX2o0XHObp4kRDqd+gUU5iUa/4HXhaA8g== ILsULwmg32tiEQGqeX1rpWI9PGZXSX2Q9tRzXCD0cD/OKMMEBEXKYZhnXj1Xr9q+Dxa11iOmuXd+hx4ZTUaBCg==", + "test this functionality, test, this, BBBBBBBBBBBBBBBB, QT4wuvJSvgrxa/27gf4cZ1jzeNyiOnDxsY0oS7SsC/eVpBNyhj2I8Rh6/wCsvqRyzAvVoksTKOuRzSFUm6vAQw== LAD8UPdf/1cMoKY7Py17uRFNA+OEpVpa9lulTW8wEhsfQsDf/FvBIYxt/YO04sBI8CA1WY+i4elM5nY0xh13Lw== functionality", + "another test of this functionality, test, this, CCCCCCCCCCCCCCCC, another H9YrqOIlLtaoSCkNR2M0go3npf118KbsHFemyvJUX4+zt8FvjoiReq/0pk5va5i+7eX6XTOMwNokUUl4r+PTHw== of oAY9W4VW35Z14mrUisMks9mTILHsswbjjrJt96swt20/lnkMyf0izXV8OhQIh2N7Ml88uXU1fUfk0jTq41udfw== functionality", + "My name is Bob and my email address is abc@example.com as of now and xyz@example.org in the future, [A-Za-z0-9+_.-]+@([\\w-]+\\.)+[\\w-], Bob ,DDDDDDDDDDDDDDDD, My name is aDNCnlEqYbJO9KKnHEhhJSSyy2BB10CUSJxRMCSGLD1gdRNFVTo+Pz7xFepWfVOhuUGulvbnitdPoc8JIlEIFg== and my email address is DdijIn6L3Cs4+PCYwCy+3bzLZ7w228quoodeI+VDlyMeFe+uZ/Ec1x/DK7MHSmZm8N5SZrINhvGgyig7aEBflg==om as of now and XQGlFjysVX1lkTFoRVCY+QEOfOf6nCoaRy5lxGAHyaFRgMGDpq93PwgZd18DZ3ZfWFRCwgPDGaExJDuRa0kkEQ==rg in the future", + }) + void testObfuscateWithTwoPatterns(String message, String pattern1, String pattern2, String salt, String expected) { + + when(mockConfig.getSalt()).thenReturn(salt); + when(mockConfig.getFormat()).thenReturn("SHA-512"); + + OneWayHashAction oneWayHashAction = new OneWayHashAction(mockConfig); + + Pattern compiledPattern1 = Pattern.compile(pattern1); + Pattern compiledPattern2 = Pattern.compile(pattern2); + + List patterns = new ArrayList<>(); + patterns.add(compiledPattern1); + patterns.add(compiledPattern2); + String result = oneWayHashAction.obfuscate(message, patterns,createRecord(message)); + assertThat(result, equalTo(expected)); + } + + @ParameterizedTest + @CsvSource({ + "testing this functionality, test, AAAAAAAAAAAAAAAA, ILsULwmg32tiEQGqeX1rpWI9PGZXSX2Q9tRzXCD0cD/OKMMEBEXKYZhnXj1Xr9q+Dxa11iOmuXd+hx4ZTUaBCg==ing this functionality", + "test this functionality, test, BBBBBBBBBBBBBBBB, QT4wuvJSvgrxa/27gf4cZ1jzeNyiOnDxsY0oS7SsC/eVpBNyhj2I8Rh6/wCsvqRyzAvVoksTKOuRzSFUm6vAQw== this functionality", + "another test of this functionality, test, CCCCCCCCCCCCCCCC, another H9YrqOIlLtaoSCkNR2M0go3npf118KbsHFemyvJUX4+zt8FvjoiReq/0pk5va5i+7eX6XTOMwNokUUl4r+PTHw== of this functionality", + "My name is Bob and my email address is abc@example.com as of now and xyz@example.org in the future, [A-Za-z0-9+_.-]+@([\\w-]+\\.)+[\\w-] ,DDDDDDDDDDDDDDDD, My name is Bob and my email address is DdijIn6L3Cs4+PCYwCy+3bzLZ7w228quoodeI+VDlyMeFe+uZ/Ec1x/DK7MHSmZm8N5SZrINhvGgyig7aEBflg==om as of now and XQGlFjysVX1lkTFoRVCY+QEOfOf6nCoaRy5lxGAHyaFRgMGDpq93PwgZd18DZ3ZfWFRCwgPDGaExJDuRa0kkEQ==rg in the future", + }) + void testObfuscateWithPatternsAndInvalidSaltKey(String message, String pattern, String salt, String expected) { + + //adding SaltKey that cannot be found, to ensure that logic is defaulted back to the configured salt value. + when(mockConfig.getSaltKey()).thenReturn(eventKeyFactory.createEventKey("id")); + when(mockConfig.getSalt()).thenReturn(salt); + when(mockConfig.getFormat()).thenReturn("SHA-512"); + + OneWayHashAction oneWayHashAction = new OneWayHashAction(mockConfig); + + Pattern compiledPattern = Pattern.compile(pattern); + List patterns = new ArrayList<>(); + patterns.add(compiledPattern); + String result = oneWayHashAction.obfuscate(message, patterns,createRecord(message)); + assertThat(result, equalTo(expected)); + } + + @ParameterizedTest + @CsvSource({ + "testing this functionality, test, AAAAAAAAAAAAAAAA, 8g+p3Td+ClA+PttgNrZ8Qsg+tIc9/46TwNDtLeM6lQILI8jcQzPz0bOUM4IrbTlqgHYuOD8r6j6EElj4E6dZLw==ing this functionality", + "test this functionality, test, BBBBBBBBBBBBBBBB, 8g+p3Td+ClA+PttgNrZ8Qsg+tIc9/46TwNDtLeM6lQILI8jcQzPz0bOUM4IrbTlqgHYuOD8r6j6EElj4E6dZLw== this functionality", + "another test of this functionality, test, CCCCCCCCCCCCCCCC, another 8g+p3Td+ClA+PttgNrZ8Qsg+tIc9/46TwNDtLeM6lQILI8jcQzPz0bOUM4IrbTlqgHYuOD8r6j6EElj4E6dZLw== of this functionality", + "My name is Bob and my email address is abc@example.com as of now and xyz@example.org in the future, [A-Za-z0-9+_.-]+@([\\w-]+\\.)+[\\w-] ,DDDDDDDDDDDDDDDD, My name is Bob and my email address is 9zuqdjZfSkx7Xh6rO7bxRpREOmEA8EdtlNXOSviW6C41+sAK2QE/z9PGtRTf+T4bvTuzWBVv7SKVov6jII5+gw==om as of now and KAn0LtIRQYzoPtJqHczu21+gWcXl1OUUwbT9nY+2s+6164/PG4OuW/CZJIUZvOfrUICiL6BUJE32JCEaOfrwjA==rg in the future", + }) + void testObfuscateWithPatternsAndValidSaltKey(String message, String pattern, String salt, String expected) { + + //adding SaltKey that cannot be found, to ensure that logic is defaulted back to the configured salt value. + when(mockConfig.getSaltKey()).thenReturn(eventKeyFactory.createEventKey("message")); + when(mockConfig.getSalt()).thenReturn(salt); + when(mockConfig.getFormat()).thenReturn("SHA-512"); + + OneWayHashAction oneWayHashAction = new OneWayHashAction(mockConfig); + + final Map testData = new HashMap<>(); + testData.put("message", message); + Pattern compiledPattern = Pattern.compile(pattern); + List patterns = new ArrayList<>(); + patterns.add(compiledPattern); + + String result = oneWayHashAction.obfuscate(message, patterns,createRecord("12345")); + assertThat(result, equalTo(expected)); + } + +} diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchClientRefresher.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchClientRefresher.java index 6859dee8be..b697fb26bf 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchClientRefresher.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchClientRefresher.java @@ -30,7 +30,7 @@ public OpenSearchClientRefresher(final PluginMetrics pluginMetrics, final Function clientFunction) { this.clientFunction = clientFunction; this.currentConfig = connectionConfiguration; - this.currentClient = clientFunction.apply(connectionConfiguration); + this.currentClient = null; credentialsChangeCounter = pluginMetrics.counter(CREDENTIALS_CHANGED); clientRefreshErrorsCounter = pluginMetrics.counter(CLIENT_REFRESH_ERRORS); } @@ -44,6 +44,9 @@ public Class getComponentClass() { public OpenSearchClient get() { readWriteLock.readLock().lock(); try { + if (currentClient == null) { + currentClient = clientFunction.apply(currentConfig); + } return currentClient; } finally { readWriteLock.readLock().unlock(); diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java index 199b4e1e0e..1b6f44fde6 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java @@ -186,6 +186,7 @@ public OpenSearchSink(final PluginSetting pluginSetting, this.bulkRequestMap = new ConcurrentHashMap<>(); this.lastFlushTimeMap = new ConcurrentHashMap<>(); this.pluginConfigObservable = pluginConfigObservable; + this.objectMapper = new ObjectMapper(); final Optional dlqConfig = openSearchSinkConfig.getRetryConfiguration().getDlq(); if (dlqConfig.isPresent()) { @@ -201,7 +202,7 @@ public void doInitialize() { doInitializeInternal(); } catch (IOException e) { LOG.warn("Failed to initialize OpenSearch sink, retrying: {} ", e.getMessage()); - closeFiles(); + this.shutdown(); } catch (InvalidPluginConfigurationException e) { LOG.error("Failed to initialize OpenSearch sink due to a configuration error.", e); this.shutdown(); @@ -212,7 +213,7 @@ public void doInitialize() { throw e; } catch (Exception e) { LOG.warn("Failed to initialize OpenSearch sink with a retryable exception. ", e); - closeFiles(); + this.shutdown(); } } @@ -279,7 +280,6 @@ private void doInitializeInternal() throws IOException { bulkRequestSupplier, pluginSetting); - objectMapper = new ObjectMapper(); this.initialized = true; LOG.info("Initialized OpenSearch sink"); } @@ -615,6 +615,7 @@ private void closeFiles() { public void shutdown() { super.shutdown(); closeFiles(); + openSearchClient.shutdown(); } private void maybeUpdateServerlessNetworkPolicy() { diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchClientRefresherTest.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchClientRefresherTest.java index b9326c606f..584051dff6 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchClientRefresherTest.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchClientRefresherTest.java @@ -71,6 +71,7 @@ void testGet() { @Test void testGetAfterUpdateWithDeprecatedBasicAuthUnchanged() { final OpenSearchClientRefresher objectUnderTest = createObjectUnderTest(); + assertThat(objectUnderTest.get(), equalTo(openSearchClient)); verify(clientFunction, times(1)).apply(any()); when(connectionConfiguration.getUsername()).thenReturn(TEST_USERNAME); when(connectionConfiguration.getPassword()).thenReturn(TEST_PASSWORD); @@ -91,6 +92,7 @@ void testGetAfterUpdateWithDeprecatedBasicAuthUnchanged() { @Test void testGetAfterUpdateWithBasicAuthUnchanged() { final OpenSearchClientRefresher objectUnderTest = createObjectUnderTest(); + assertThat(objectUnderTest.get(), equalTo(openSearchClient)); verify(clientFunction, times(1)).apply(any()); when(connectionConfiguration.getAuthConfig()).thenReturn(authConfig); when(authConfig.getUsername()).thenReturn(TEST_USERNAME); @@ -115,6 +117,7 @@ void testGetAfterUpdateWithBasicAuthUnchanged() { void testGetAfterUpdateWithDeprecatedUsernameChanged() { when(pluginMetrics.counter(CREDENTIALS_CHANGED)).thenReturn(credentialsChangeCounter); final OpenSearchClientRefresher objectUnderTest = createObjectUnderTest(); + assertThat(objectUnderTest.get(), equalTo(openSearchClient)); verify(clientFunction, times(1)).apply(any()); assertThat(objectUnderTest.get(), equalTo(openSearchClient)); when(connectionConfiguration.getUsername()).thenReturn(TEST_USERNAME); @@ -138,6 +141,7 @@ void testGetAfterUpdateWithDeprecatedUsernameChanged() { void testGetAfterUpdateWithUsernameChanged() { when(pluginMetrics.counter(CREDENTIALS_CHANGED)).thenReturn(credentialsChangeCounter); final OpenSearchClientRefresher objectUnderTest = createObjectUnderTest(); + assertThat(objectUnderTest.get(), equalTo(openSearchClient)); verify(clientFunction, times(1)).apply(any()); assertThat(objectUnderTest.get(), equalTo(openSearchClient)); when(connectionConfiguration.getAuthConfig()).thenReturn(authConfig); @@ -165,6 +169,7 @@ void testGetAfterUpdateWithUsernameChanged() { void testGetAfterUpdateWithDeprecatedPasswordChanged() { when(pluginMetrics.counter(CREDENTIALS_CHANGED)).thenReturn(credentialsChangeCounter); final OpenSearchClientRefresher objectUnderTest = createObjectUnderTest(); + assertThat(objectUnderTest.get(), equalTo(openSearchClient)); verify(clientFunction, times(1)).apply(any()); assertThat(objectUnderTest.get(), equalTo(openSearchClient)); when(connectionConfiguration.getUsername()).thenReturn(TEST_USERNAME); @@ -190,6 +195,7 @@ void testGetAfterUpdateWithDeprecatedPasswordChanged() { void testGetAfterUpdateWithPasswordChanged() { when(pluginMetrics.counter(CREDENTIALS_CHANGED)).thenReturn(credentialsChangeCounter); final OpenSearchClientRefresher objectUnderTest = createObjectUnderTest(); + assertThat(objectUnderTest.get(), equalTo(openSearchClient)); verify(clientFunction, times(1)).apply(any()); assertThat(objectUnderTest.get(), equalTo(openSearchClient)); when(connectionConfiguration.getAuthConfig()).thenReturn(authConfig); @@ -219,6 +225,7 @@ void testGetAfterUpdateClientFailure() { when(pluginMetrics.counter(CREDENTIALS_CHANGED)).thenReturn(credentialsChangeCounter); when(pluginMetrics.counter(CLIENT_REFRESH_ERRORS)).thenReturn(clientRefreshErrorsCounter); final OpenSearchClientRefresher objectUnderTest = createObjectUnderTest(); + assertThat(objectUnderTest.get(), equalTo(openSearchClient)); verify(clientFunction, times(1)).apply(any()); assertThat(objectUnderTest.get(), equalTo(openSearchClient)); when(connectionConfiguration.getUsername()).thenReturn(TEST_USERNAME); @@ -240,4 +247,4 @@ void testGetAfterUpdateClientFailure() { verify(clientRefreshErrorsCounter).increment(); verify(clientFunction, times(2)).apply(any()); } -} \ No newline at end of file +} diff --git a/data-prepper-plugins/rds-source/build.gradle b/data-prepper-plugins/rds-source/build.gradle index 6d6a681646..14c851f645 100644 --- a/data-prepper-plugins/rds-source/build.gradle +++ b/data-prepper-plugins/rds-source/build.gradle @@ -26,4 +26,7 @@ dependencies { testImplementation project(path: ':data-prepper-test-common') testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml' testImplementation project(path: ':data-prepper-test-event') + testImplementation libs.avro.core + testImplementation libs.parquet.hadoop + testImplementation libs.parquet.avro } diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/RdsService.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/RdsService.java index 982751a3db..73b71e0085 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/RdsService.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/RdsService.java @@ -88,7 +88,7 @@ public void start(Buffer> buffer) { exportScheduler = new ExportScheduler( sourceCoordinator, snapshotManager, exportTaskManager, s3Client, pluginMetrics); dataFileScheduler = new DataFileScheduler( - sourceCoordinator, sourceConfig, s3Client, eventFactory, buffer); + sourceCoordinator, sourceConfig, s3Client, eventFactory, buffer, pluginMetrics); runnableList.add(exportScheduler); runnableList.add(dataFileScheduler); } diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileLoader.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileLoader.java index f12d44a75f..42a5b3a0d5 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileLoader.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileLoader.java @@ -5,7 +5,10 @@ package org.opensearch.dataprepper.plugins.source.rds.export; +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.DistributionSummary; import org.opensearch.dataprepper.buffer.common.BufferAccumulator; +import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.codec.InputCodec; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.record.Record; @@ -18,12 +21,18 @@ import java.io.InputStream; import java.time.Duration; import java.util.List; +import java.util.concurrent.atomic.AtomicLong; public class DataFileLoader implements Runnable { private static final Logger LOG = LoggerFactory.getLogger(DataFileLoader.class); static final Duration VERSION_OVERLAP_TIME_FOR_EXPORT = Duration.ofMinutes(5); + static final String EXPORT_RECORDS_TOTAL_COUNT = "exportRecordsTotal"; + static final String EXPORT_RECORDS_PROCESSED_COUNT = "exportRecordsProcessed"; + static final String EXPORT_RECORDS_PROCESSING_ERROR_COUNT = "exportRecordsProcessingErrors"; + static final String BYTES_RECEIVED = "bytesReceived"; + static final String BYTES_PROCESSED = "bytesProcessed"; private final DataFilePartition dataFilePartition; private final String bucket; @@ -32,12 +41,18 @@ public class DataFileLoader implements Runnable { private final InputCodec codec; private final BufferAccumulator> bufferAccumulator; private final ExportRecordConverter recordConverter; + private final Counter exportRecordsTotalCounter; + private final Counter exportRecordSuccessCounter; + private final Counter exportRecordErrorCounter; + private final DistributionSummary bytesReceivedSummary; + private final DistributionSummary bytesProcessedSummary; private DataFileLoader(final DataFilePartition dataFilePartition, - final InputCodec codec, - final BufferAccumulator> bufferAccumulator, - final S3ObjectReader objectReader, - final ExportRecordConverter recordConverter) { + final InputCodec codec, + final BufferAccumulator> bufferAccumulator, + final S3ObjectReader objectReader, + final ExportRecordConverter recordConverter, + final PluginMetrics pluginMetrics) { this.dataFilePartition = dataFilePartition; bucket = dataFilePartition.getBucket(); objectKey = dataFilePartition.getKey(); @@ -45,24 +60,37 @@ private DataFileLoader(final DataFilePartition dataFilePartition, this.codec = codec; this.bufferAccumulator = bufferAccumulator; this.recordConverter = recordConverter; + + exportRecordsTotalCounter = pluginMetrics.counter(EXPORT_RECORDS_TOTAL_COUNT); + exportRecordSuccessCounter = pluginMetrics.counter(EXPORT_RECORDS_PROCESSED_COUNT); + exportRecordErrorCounter = pluginMetrics.counter(EXPORT_RECORDS_PROCESSING_ERROR_COUNT); + bytesReceivedSummary = pluginMetrics.summary(BYTES_RECEIVED); + bytesProcessedSummary = pluginMetrics.summary(BYTES_PROCESSED); } public static DataFileLoader create(final DataFilePartition dataFilePartition, final InputCodec codec, final BufferAccumulator> bufferAccumulator, final S3ObjectReader objectReader, - final ExportRecordConverter recordConverter) { - return new DataFileLoader(dataFilePartition, codec, bufferAccumulator, objectReader, recordConverter); + final ExportRecordConverter recordConverter, + final PluginMetrics pluginMetrics) { + return new DataFileLoader(dataFilePartition, codec, bufferAccumulator, objectReader, recordConverter, pluginMetrics); } @Override public void run() { LOG.info("Start loading s3://{}/{}", bucket, objectKey); + AtomicLong eventCount = new AtomicLong(); try (InputStream inputStream = objectReader.readFile(bucket, objectKey)) { - codec.parse(inputStream, record -> { try { + exportRecordsTotalCounter.increment(); + final Event event = record.getData(); + final String string = event.toJsonString(); + final long bytes = string.getBytes().length; + bytesReceivedSummary.record(bytes); + DataFileProgressState progressState = dataFilePartition.getProgressState().get(); // TODO: primary key to be obtained by querying database schema @@ -79,6 +107,8 @@ public void run() { snapshotTime, eventVersionNumber)); bufferAccumulator.add(transformedRecord); + eventCount.getAndIncrement(); + bytesProcessedSummary.record(bytes); } catch (Exception e) { throw new RuntimeException(e); } @@ -92,8 +122,10 @@ public void run() { try { bufferAccumulator.flush(); + exportRecordSuccessCounter.increment(eventCount.get()); } catch (Exception e) { LOG.error("Failed to write events to buffer", e); + exportRecordErrorCounter.increment(eventCount.get()); } } } diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileScheduler.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileScheduler.java index 0a2b2fb638..f766aec3d2 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileScheduler.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileScheduler.java @@ -5,7 +5,9 @@ package org.opensearch.dataprepper.plugins.source.rds.export; +import io.micrometer.core.instrument.Counter; import org.opensearch.dataprepper.buffer.common.BufferAccumulator; +import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.codec.InputCodec; import org.opensearch.dataprepper.model.event.Event; @@ -48,6 +50,9 @@ public class DataFileScheduler implements Runnable { static final Duration BUFFER_TIMEOUT = Duration.ofSeconds(60); static final int DEFAULT_BUFFER_BATCH_SIZE = 1_000; + static final String EXPORT_S3_OBJECTS_PROCESSED_COUNT = "exportS3ObjectsProcessed"; + static final String EXPORT_S3_OBJECTS_ERROR_COUNT = "exportS3ObjectsErrors"; + static final String ACTIVE_EXPORT_S3_OBJECT_CONSUMERS_GAUGE = "activeExportS3ObjectConsumers"; private final EnhancedSourceCoordinator sourceCoordinator; @@ -57,6 +62,11 @@ public class DataFileScheduler implements Runnable { private final InputCodec codec; private final BufferAccumulator> bufferAccumulator; private final ExportRecordConverter recordConverter; + private final PluginMetrics pluginMetrics; + + private final Counter exportFileSuccessCounter; + private final Counter exportFileErrorCounter; + private final AtomicInteger activeExportS3ObjectConsumersGauge; private volatile boolean shutdownRequested = false; @@ -64,7 +74,8 @@ public DataFileScheduler(final EnhancedSourceCoordinator sourceCoordinator, final RdsSourceConfig sourceConfig, final S3Client s3Client, final EventFactory eventFactory, - final Buffer> buffer) { + final Buffer> buffer, + final PluginMetrics pluginMetrics) { this.sourceCoordinator = sourceCoordinator; this.sourceConfig = sourceConfig; codec = new ParquetInputCodec(eventFactory); @@ -72,6 +83,12 @@ public DataFileScheduler(final EnhancedSourceCoordinator sourceCoordinator, objectReader = new S3ObjectReader(s3Client); recordConverter = new ExportRecordConverter(); executor = Executors.newFixedThreadPool(DATA_LOADER_MAX_JOB_COUNT); + this.pluginMetrics = pluginMetrics; + + this.exportFileSuccessCounter = pluginMetrics.counter(EXPORT_S3_OBJECTS_PROCESSED_COUNT); + this.exportFileErrorCounter = pluginMetrics.counter(EXPORT_S3_OBJECTS_ERROR_COUNT); + this.activeExportS3ObjectConsumersGauge = pluginMetrics.gauge( + ACTIVE_EXPORT_S3_OBJECT_CONSUMERS_GAUGE, numOfWorkers, AtomicInteger::get); } @Override @@ -116,15 +133,18 @@ public void shutdown() { } private void processDataFilePartition(DataFilePartition dataFilePartition) { - Runnable loader = DataFileLoader.create(dataFilePartition, codec, bufferAccumulator, objectReader, recordConverter); + Runnable loader = DataFileLoader.create( + dataFilePartition, codec, bufferAccumulator, objectReader, recordConverter, pluginMetrics); CompletableFuture runLoader = CompletableFuture.runAsync(loader, executor); runLoader.whenComplete((v, ex) -> { if (ex == null) { + exportFileSuccessCounter.increment(); // Update global state so we know if all s3 files have been loaded updateLoadStatus(dataFilePartition.getExportTaskId(), DEFAULT_UPDATE_LOAD_STATUS_TIMEOUT); sourceCoordinator.completePartition(dataFilePartition); } else { + exportFileErrorCounter.increment(); LOG.error("There was an exception while processing an S3 data file", ex); sourceCoordinator.giveUpPartition(dataFilePartition); } diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/ExportScheduler.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/ExportScheduler.java index 79ef3d5a61..343ade8b85 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/ExportScheduler.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/ExportScheduler.java @@ -5,6 +5,7 @@ package org.opensearch.dataprepper.plugins.source.rds.export; +import io.micrometer.core.instrument.Counter; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; @@ -42,12 +43,15 @@ public class ExportScheduler implements Runnable { private static final Logger LOG = LoggerFactory.getLogger(ExportScheduler.class); private static final int DEFAULT_TAKE_LEASE_INTERVAL_MILLIS = 60_000; - private static final Duration DEFAULT_CLOSE_DURATION = Duration.ofMinutes(10); - private static final int DEFAULT_MAX_CLOSE_COUNT = 36; + static final Duration DEFAULT_CLOSE_DURATION = Duration.ofMinutes(10); + static final int DEFAULT_MAX_CLOSE_COUNT = 36; private static final int DEFAULT_CHECKPOINT_INTERVAL_MILLS = 5 * 60_000; private static final int DEFAULT_CHECK_STATUS_INTERVAL_MILLS = 30 * 1000; private static final Duration DEFAULT_SNAPSHOT_STATUS_CHECK_TIMEOUT = Duration.ofMinutes(60); static final String PARQUET_SUFFIX = ".parquet"; + static final String EXPORT_JOB_SUCCESS_COUNT = "exportJobSuccess"; + static final String EXPORT_JOB_FAILURE_COUNT = "exportJobFailure"; + static final String EXPORT_S3_OBJECTS_TOTAL_COUNT = "exportS3ObjectsTotal"; private final S3Client s3Client; private final PluginMetrics pluginMetrics; @@ -56,6 +60,10 @@ public class ExportScheduler implements Runnable { private final ExportTaskManager exportTaskManager; private final SnapshotManager snapshotManager; + private final Counter exportJobSuccessCounter; + private final Counter exportJobFailureCounter; + private final Counter exportS3ObjectsTotalCounter; + private volatile boolean shutdownRequested = false; public ExportScheduler(final EnhancedSourceCoordinator sourceCoordinator, @@ -69,6 +77,10 @@ public ExportScheduler(final EnhancedSourceCoordinator sourceCoordinator, this.executor = Executors.newCachedThreadPool(); this.snapshotManager = snapshotManager; this.exportTaskManager = exportTaskManager; + + exportJobSuccessCounter = pluginMetrics.counter(EXPORT_JOB_SUCCESS_COUNT); + exportJobFailureCounter = pluginMetrics.counter(EXPORT_JOB_FAILURE_COUNT); + exportS3ObjectsTotalCounter = pluginMetrics.counter(EXPORT_S3_OBJECTS_TOTAL_COUNT); } @Override @@ -133,8 +145,7 @@ private String getOrCreateExportTaskId(ExportPartition exportPartition) { progressState.setSnapshotId(snapshotInfo.getSnapshotId()); sourceCoordinator.saveProgressStateForPartition(exportPartition, null); } else { - LOG.error("The snapshot failed to create, it will be retried"); - closeExportPartitionWithError(exportPartition); + LOG.error("The snapshot failed to create. The export will be retried"); return null; } @@ -142,8 +153,7 @@ private String getOrCreateExportTaskId(ExportPartition exportPartition) { try { snapshotInfo = checkSnapshotStatus(snapshotId, DEFAULT_SNAPSHOT_STATUS_CHECK_TIMEOUT); } catch (Exception e) { - LOG.warn("Check snapshot status for {} failed", snapshotId, e); - sourceCoordinator.giveUpPartition(exportPartition); + LOG.warn("Check snapshot status for {} failed. The export will be retried", snapshotId, e); return null; } progressState.setSnapshotTime(snapshotInfo.getCreateTime().toEpochMilli()); @@ -159,7 +169,6 @@ private String getOrCreateExportTaskId(ExportPartition exportPartition) { sourceCoordinator.saveProgressStateForPartition(exportPartition, null); } else { LOG.error("The export task failed to create, it will be retried"); - closeExportPartitionWithError(exportPartition); return null; } @@ -167,6 +176,7 @@ private String getOrCreateExportTaskId(ExportPartition exportPartition) { } private void closeExportPartitionWithError(ExportPartition exportPartition) { + exportJobFailureCounter.increment(); ExportProgressState exportProgressState = exportPartition.getProgressState().get(); // Clear current task id, so that a new export can be submitted. exportProgressState.setExportTaskId(null); @@ -309,12 +319,15 @@ private void createDataFilePartitions(String bucket, String exportTaskId, List> bufferAccumulator; private final List tableNames; private final String s3Prefix; + private final PluginMetrics pluginMetrics; - public BinlogEventListener(final Buffer> buffer, final RdsSourceConfig sourceConfig) { + private final Counter changeEventSuccessCounter; + private final Counter changeEventErrorCounter; + private final DistributionSummary bytesReceivedSummary; + private final DistributionSummary bytesProcessedSummary; + + public BinlogEventListener(final Buffer> buffer, + final RdsSourceConfig sourceConfig, + final PluginMetrics pluginMetrics) { tableMetadataMap = new HashMap<>(); recordConverter = new StreamRecordConverter(sourceConfig.getStream().getPartitionCount()); bufferAccumulator = BufferAccumulator.create(buffer, DEFAULT_BUFFER_BATCH_SIZE, BUFFER_TIMEOUT); s3Prefix = sourceConfig.getS3Prefix(); tableNames = sourceConfig.getTableNames(); + this.pluginMetrics = pluginMetrics; + + changeEventSuccessCounter = pluginMetrics.counter(CHANGE_EVENTS_PROCESSED_COUNT); + changeEventErrorCounter = pluginMetrics.counter(CHANGE_EVENTS_PROCESSING_ERROR_COUNT); + bytesReceivedSummary = pluginMetrics.summary(BYTES_RECEIVED); + bytesProcessedSummary = pluginMetrics.summary(BYTES_PROCESSED); } @Override @@ -95,7 +116,9 @@ void handleTableMapEvent(com.github.shyiko.mysql.binlog.event.Event event) { } void handleInsertEvent(com.github.shyiko.mysql.binlog.event.Event event) { - // get new row data from the event + final long bytes = event.toString().getBytes().length; + bytesReceivedSummary.record(bytes); + LOG.debug("Handling insert event"); final WriteRowsEventData data = event.getData(); if (!tableMetadataMap.containsKey(data.getTableId())) { @@ -113,6 +136,7 @@ void handleInsertEvent(com.github.shyiko.mysql.binlog.event.Event event) { final long eventTimestampMillis = event.getHeader().getTimestamp(); // Construct data prepper JacksonEvent + int eventCount = 0; for (final Object[] rowDataArray : data.getRows()) { final Map rowDataMap = new HashMap<>(); for (int i = 0; i < rowDataArray.length; i++) { @@ -130,12 +154,17 @@ void handleInsertEvent(com.github.shyiko.mysql.binlog.event.Event event) { eventTimestampMillis, eventTimestampMillis); addToBuffer(new Record<>(pipelineEvent)); + eventCount++; } + bytesProcessedSummary.record(bytes); - flushBuffer(); + flushBuffer(eventCount); } void handleUpdateEvent(com.github.shyiko.mysql.binlog.event.Event event) { + final long bytes = event.toString().getBytes().length; + bytesReceivedSummary.record(bytes); + LOG.debug("Handling update event"); final UpdateRowsEventData data = event.getData(); if (!tableMetadataMap.containsKey(data.getTableId())) { @@ -151,6 +180,7 @@ void handleUpdateEvent(com.github.shyiko.mysql.binlog.event.Event event) { final List primaryKeys = tableMetadata.getPrimaryKeys(); final long eventTimestampMillis = event.getHeader().getTimestamp(); + int eventCount = 0; for (Map.Entry updatedRow : data.getRows()) { // updatedRow contains data before update as key and data after update as value final Object[] rowData = updatedRow.getValue(); @@ -171,12 +201,17 @@ void handleUpdateEvent(com.github.shyiko.mysql.binlog.event.Event event) { eventTimestampMillis, eventTimestampMillis); addToBuffer(new Record<>(pipelineEvent)); + eventCount++; } + bytesProcessedSummary.record(bytes); - flushBuffer(); + flushBuffer(eventCount); } void handleDeleteEvent(com.github.shyiko.mysql.binlog.event.Event event) { + final long bytes = event.toString().getBytes().length; + bytesReceivedSummary.record(bytes); + LOG.debug("Handling delete event"); final DeleteRowsEventData data = event.getData(); if (!tableMetadataMap.containsKey(data.getTableId())) { @@ -193,6 +228,7 @@ void handleDeleteEvent(com.github.shyiko.mysql.binlog.event.Event event) { final List primaryKeys = tableMetadata.getPrimaryKeys(); final long eventTimestampMillis = event.getHeader().getTimestamp(); + int eventCount = 0; for (Object[] rowDataArray : data.getRows()) { final Map rowDataMap = new HashMap<>(); for (int i = 0; i < rowDataArray.length; i++) { @@ -210,9 +246,11 @@ void handleDeleteEvent(com.github.shyiko.mysql.binlog.event.Event event) { eventTimestampMillis, eventTimestampMillis); addToBuffer(new Record<>(pipelineEvent)); + eventCount++; } + bytesProcessedSummary.record(bytes); - flushBuffer(); + flushBuffer(eventCount); } private boolean isTableOfInterest(String tableName) { @@ -227,11 +265,13 @@ private void addToBuffer(final Record record) { } } - private void flushBuffer() { + private void flushBuffer(int eventCount) { try { bufferAccumulator.flush(); + changeEventSuccessCounter.increment(eventCount); } catch (Exception e) { LOG.error("Failed to flush buffer", e); + changeEventErrorCounter.increment(eventCount); } } diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamScheduler.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamScheduler.java index 0b42c95c38..acb4ea3f85 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamScheduler.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamScheduler.java @@ -41,8 +41,9 @@ public StreamScheduler(final EnhancedSourceCoordinator sourceCoordinator, this.sourceCoordinator = sourceCoordinator; this.sourceConfig = sourceConfig; this.binaryLogClient = binaryLogClient; - this.binaryLogClient.registerEventListener(new BinlogEventListener(buffer, sourceConfig)); + this.binaryLogClient.registerEventListener(new BinlogEventListener(buffer, sourceConfig, pluginMetrics)); this.pluginMetrics = pluginMetrics; + } @Override diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileLoaderTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileLoaderTest.java index 1ed91bc031..ccb36347fa 100644 --- a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileLoaderTest.java +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileLoaderTest.java @@ -5,38 +5,61 @@ package org.opensearch.dataprepper.plugins.source.rds.export; +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.DistributionSummary; +import org.apache.avro.generic.GenericRecord; +import org.apache.parquet.avro.AvroParquetReader; +import org.apache.parquet.hadoop.ParquetReader; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Answers; import org.mockito.Mock; +import org.mockito.MockedStatic; import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.dataprepper.buffer.common.BufferAccumulator; +import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.codec.InputCodec; +import org.opensearch.dataprepper.model.event.BaseEventBuilder; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.EventBuilder; +import org.opensearch.dataprepper.model.event.EventFactory; +import org.opensearch.dataprepper.model.io.InputFile; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.plugins.codec.parquet.ParquetInputCodec; import org.opensearch.dataprepper.plugins.source.rds.converter.ExportRecordConverter; import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.DataFilePartition; +import org.opensearch.dataprepper.plugins.source.rds.coordination.state.DataFileProgressState; import java.io.InputStream; +import java.util.Optional; import java.util.UUID; -import java.util.function.Consumer; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.RETURNS_DEEP_STUBS; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.opensearch.dataprepper.plugins.source.rds.export.DataFileLoader.BYTES_PROCESSED; +import static org.opensearch.dataprepper.plugins.source.rds.export.DataFileLoader.BYTES_RECEIVED; +import static org.opensearch.dataprepper.plugins.source.rds.export.DataFileLoader.EXPORT_RECORDS_PROCESSED_COUNT; +import static org.opensearch.dataprepper.plugins.source.rds.export.DataFileLoader.EXPORT_RECORDS_PROCESSING_ERROR_COUNT; +import static org.opensearch.dataprepper.plugins.source.rds.export.DataFileLoader.EXPORT_RECORDS_TOTAL_COUNT; @ExtendWith(MockitoExtension.class) class DataFileLoaderTest { - @Mock + @Mock(answer = Answers.RETURNS_DEEP_STUBS) private DataFilePartition dataFilePartition; @Mock private BufferAccumulator> bufferAccumulator; @Mock - private InputCodec codec; + private EventFactory eventFactory; @Mock private S3ObjectReader s3ObjectReader; @@ -44,24 +67,120 @@ class DataFileLoaderTest { @Mock private ExportRecordConverter recordConverter; + @Mock + private PluginMetrics pluginMetrics; + + @Mock + private Counter exportRecordsTotalCounter; + + @Mock + private Counter exportRecordSuccessCounter; + + @Mock + private Counter exportRecordErrorCounter; + + @Mock + private DistributionSummary bytesReceivedSummary; + + @Mock + private DistributionSummary bytesProcessedSummary; + + @BeforeEach + void setUp() { + when(pluginMetrics.counter(EXPORT_RECORDS_TOTAL_COUNT)).thenReturn(exportRecordsTotalCounter); + when(pluginMetrics.counter(EXPORT_RECORDS_PROCESSED_COUNT)).thenReturn(exportRecordSuccessCounter); + when(pluginMetrics.counter(EXPORT_RECORDS_PROCESSING_ERROR_COUNT)).thenReturn(exportRecordErrorCounter); + when(pluginMetrics.summary(BYTES_RECEIVED)).thenReturn(bytesReceivedSummary); + when(pluginMetrics.summary(BYTES_PROCESSED)).thenReturn(bytesProcessedSummary); + } + + @Test + void test_run_success() throws Exception { + final String bucket = UUID.randomUUID().toString(); + final String key = UUID.randomUUID().toString(); + when(dataFilePartition.getBucket()).thenReturn(bucket); + when(dataFilePartition.getKey()).thenReturn(key); + final DataFileProgressState progressState = mock(DataFileProgressState.class, RETURNS_DEEP_STUBS); + when(dataFilePartition.getProgressState()).thenReturn(Optional.of(progressState)); + + InputStream inputStream = mock(InputStream.class); + when(s3ObjectReader.readFile(bucket, key)).thenReturn(inputStream); + + DataFileLoader dataFileLoader = createObjectUnderTest(); + + final String randomString = UUID.randomUUID().toString(); + final long sizeBytes = randomString.getBytes().length; + final BaseEventBuilder eventBuilder = mock(EventBuilder.class, RETURNS_DEEP_STUBS); + final Event event = mock(Event.class); + when(eventFactory.eventBuilder(any())).thenReturn(eventBuilder); + when(eventBuilder.withEventType(any()).withData(any()).build()).thenReturn(event); + when(event.toJsonString()).thenReturn(randomString); + + try (MockedStatic readerMockedStatic = mockStatic(AvroParquetReader.class)) { + ParquetReader parquetReader = mock(ParquetReader.class); + AvroParquetReader.Builder builder = mock(AvroParquetReader.Builder.class); + readerMockedStatic.when(() -> AvroParquetReader.builder(any(InputFile.class), any())).thenReturn(builder); + when(builder.build()).thenReturn(parquetReader); + when(parquetReader.read()).thenReturn(mock(GenericRecord.class, RETURNS_DEEP_STUBS), null); + + dataFileLoader.run(); + } + + verify(bufferAccumulator).add(any(Record.class)); + verify(bufferAccumulator).flush(); + + verify(exportRecordsTotalCounter).increment(); + verify(bytesReceivedSummary).record(sizeBytes); + verify(bytesProcessedSummary).record(sizeBytes); + verify(exportRecordSuccessCounter).increment(1); + verify(exportRecordErrorCounter, never()).increment(1); + } + @Test - void test_run() throws Exception { + void test_flush_failure_then_error_metric_updated() throws Exception { final String bucket = UUID.randomUUID().toString(); final String key = UUID.randomUUID().toString(); when(dataFilePartition.getBucket()).thenReturn(bucket); when(dataFilePartition.getKey()).thenReturn(key); + final DataFileProgressState progressState = mock(DataFileProgressState.class, RETURNS_DEEP_STUBS); + when(dataFilePartition.getProgressState()).thenReturn(Optional.of(progressState)); InputStream inputStream = mock(InputStream.class); when(s3ObjectReader.readFile(bucket, key)).thenReturn(inputStream); - DataFileLoader objectUnderTest = createObjectUnderTest(); - objectUnderTest.run(); + DataFileLoader dataFileLoader = createObjectUnderTest(); - verify(codec).parse(eq(inputStream), any(Consumer.class)); + final String randomString = UUID.randomUUID().toString(); + final long sizeBytes = randomString.getBytes().length; + final BaseEventBuilder eventBuilder = mock(EventBuilder.class, RETURNS_DEEP_STUBS); + final Event event = mock(Event.class); + when(eventFactory.eventBuilder(any())).thenReturn(eventBuilder); + when(eventBuilder.withEventType(any()).withData(any()).build()).thenReturn(event); + when(event.toJsonString()).thenReturn(randomString); + doThrow(new RuntimeException("testing")).when(bufferAccumulator).flush(); + + try (MockedStatic readerMockedStatic = mockStatic(AvroParquetReader.class)) { + ParquetReader parquetReader = mock(ParquetReader.class); + AvroParquetReader.Builder builder = mock(AvroParquetReader.Builder.class); + readerMockedStatic.when(() -> AvroParquetReader.builder(any(InputFile.class), any())).thenReturn(builder); + when(builder.build()).thenReturn(parquetReader); + when(parquetReader.read()).thenReturn(mock(GenericRecord.class, RETURNS_DEEP_STUBS), null); + + dataFileLoader.run(); + } + + verify(bufferAccumulator).add(any(Record.class)); verify(bufferAccumulator).flush(); + + verify(exportRecordsTotalCounter).increment(); + verify(bytesReceivedSummary).record(sizeBytes); + verify(bytesProcessedSummary).record(sizeBytes); + verify(exportRecordSuccessCounter, never()).increment(1); + verify(exportRecordErrorCounter).increment(1); } private DataFileLoader createObjectUnderTest() { - return DataFileLoader.create(dataFilePartition, codec, bufferAccumulator, s3ObjectReader, recordConverter); + final InputCodec codec = new ParquetInputCodec(eventFactory); + return DataFileLoader.create(dataFilePartition, codec, bufferAccumulator, s3ObjectReader, recordConverter, pluginMetrics); } } \ No newline at end of file diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileSchedulerTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileSchedulerTest.java index ee0d0e2852..5a5a56c6fd 100644 --- a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileSchedulerTest.java +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileSchedulerTest.java @@ -5,6 +5,7 @@ package org.opensearch.dataprepper.plugins.source.rds.export; +import io.micrometer.core.instrument.Counter; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -12,6 +13,7 @@ import org.mockito.MockedStatic; import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.dataprepper.buffer.common.BufferAccumulator; +import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.codec.InputCodec; import org.opensearch.dataprepper.model.event.Event; @@ -32,17 +34,22 @@ import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; import static org.awaitility.Awaitility.await; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; +import static org.opensearch.dataprepper.plugins.source.rds.export.DataFileScheduler.ACTIVE_EXPORT_S3_OBJECT_CONSUMERS_GAUGE; +import static org.opensearch.dataprepper.plugins.source.rds.export.DataFileScheduler.EXPORT_S3_OBJECTS_PROCESSED_COUNT; @ExtendWith(MockitoExtension.class) class DataFileSchedulerTest { @@ -62,20 +69,37 @@ class DataFileSchedulerTest { @Mock private Buffer> buffer; + @Mock + private PluginMetrics pluginMetrics; + @Mock private DataFilePartition dataFilePartition; + @Mock + private Counter exportFileSuccessCounter; + + @Mock + private Counter exportFileErrorCounter; + + @Mock + private AtomicInteger activeExportS3ObjectConsumersGauge; + private Random random; @BeforeEach void setUp() { random = new Random(); + when(pluginMetrics.counter(EXPORT_S3_OBJECTS_PROCESSED_COUNT)).thenReturn(exportFileSuccessCounter); + when(pluginMetrics.counter(eq(DataFileScheduler.EXPORT_S3_OBJECTS_ERROR_COUNT))).thenReturn(exportFileErrorCounter); + when(pluginMetrics.gauge(eq(ACTIVE_EXPORT_S3_OBJECT_CONSUMERS_GAUGE), any(AtomicInteger.class), any())) + .thenReturn(activeExportS3ObjectConsumersGauge); } @Test void test_given_no_datafile_partition_then_no_export() throws InterruptedException { when(sourceCoordinator.acquireAvailablePartition(DataFilePartition.PARTITION_TYPE)).thenReturn(Optional.empty()); + final DataFileScheduler objectUnderTest = createObjectUnderTest(); final ExecutorService executorService = Executors.newSingleThreadExecutor(); executorService.submit(objectUnderTest); @@ -84,12 +108,11 @@ void test_given_no_datafile_partition_then_no_export() throws InterruptedExcepti Thread.sleep(100); executorService.shutdownNow(); - verifyNoInteractions(s3Client, buffer); + verifyNoInteractions(s3Client, buffer, exportFileSuccessCounter, activeExportS3ObjectConsumersGauge); } @Test void test_given_available_datafile_partition_then_load_datafile() { - DataFileScheduler objectUnderTest = createObjectUnderTest(); final String exportTaskId = UUID.randomUUID().toString(); when(dataFilePartition.getExportTaskId()).thenReturn(exportTaskId); @@ -100,13 +123,15 @@ void test_given_available_datafile_partition_then_load_datafile() { when(globalStatePartition.getProgressState()).thenReturn(Optional.of(loadStatusMap)); when(sourceCoordinator.getPartition(exportTaskId)).thenReturn(Optional.of(globalStatePartition)); + DataFileScheduler objectUnderTest = createObjectUnderTest(); final ExecutorService executorService = Executors.newSingleThreadExecutor(); executorService.submit(() -> { // MockedStatic needs to be created on the same thread it's used try (MockedStatic dataFileLoaderMockedStatic = mockStatic(DataFileLoader.class)) { DataFileLoader dataFileLoader = mock(DataFileLoader.class); dataFileLoaderMockedStatic.when(() -> DataFileLoader.create( - eq(dataFilePartition), any(InputCodec.class), any(BufferAccumulator.class), any(S3ObjectReader.class), any(ExportRecordConverter.class))) + eq(dataFilePartition), any(InputCodec.class), any(BufferAccumulator.class), any(S3ObjectReader.class), + any(ExportRecordConverter.class), any(PluginMetrics.class))) .thenReturn(dataFileLoader); doNothing().when(dataFileLoader).run(); objectUnderTest.run(); @@ -116,9 +141,39 @@ void test_given_available_datafile_partition_then_load_datafile() { .untilAsserted(() -> verify(sourceCoordinator).completePartition(dataFilePartition)); executorService.shutdownNow(); + verify(exportFileSuccessCounter).increment(); + verify(exportFileErrorCounter, never()).increment(); verify(sourceCoordinator).completePartition(dataFilePartition); } + @Test + void test_data_file_loader_throws_exception_then_give_up_partition() { + + when(sourceCoordinator.acquireAvailablePartition(DataFilePartition.PARTITION_TYPE)).thenReturn(Optional.of(dataFilePartition)); + + DataFileScheduler objectUnderTest = createObjectUnderTest(); + final ExecutorService executorService = Executors.newSingleThreadExecutor(); + executorService.submit(() -> { + // MockedStatic needs to be created on the same thread it's used + try (MockedStatic dataFileLoaderMockedStatic = mockStatic(DataFileLoader.class)) { + DataFileLoader dataFileLoader = mock(DataFileLoader.class); + dataFileLoaderMockedStatic.when(() -> DataFileLoader.create( + eq(dataFilePartition), any(InputCodec.class), any(BufferAccumulator.class), any(S3ObjectReader.class), + any(ExportRecordConverter.class), any(PluginMetrics.class))) + .thenReturn(dataFileLoader); + doThrow(new RuntimeException()).when(dataFileLoader).run(); + objectUnderTest.run(); + } + }); + await().atMost(Duration.ofSeconds(1)) + .untilAsserted(() -> verify(sourceCoordinator).giveUpPartition(dataFilePartition)); + executorService.shutdownNow(); + + verify(exportFileSuccessCounter, never()).increment(); + verify(exportFileErrorCounter).increment(); + verify(sourceCoordinator).giveUpPartition(dataFilePartition); + } + @Test void test_shutdown() { DataFileScheduler objectUnderTest = createObjectUnderTest(); @@ -132,6 +187,6 @@ void test_shutdown() { } private DataFileScheduler createObjectUnderTest() { - return new DataFileScheduler(sourceCoordinator, sourceConfig, s3Client, eventFactory, buffer); + return new DataFileScheduler(sourceCoordinator, sourceConfig, s3Client, eventFactory, buffer, pluginMetrics); } } \ No newline at end of file diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/export/ExportSchedulerTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/export/ExportSchedulerTest.java index a1a520a47a..f5036e8890 100644 --- a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/export/ExportSchedulerTest.java +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/export/ExportSchedulerTest.java @@ -6,6 +6,7 @@ package org.opensearch.dataprepper.plugins.source.rds.export; +import io.micrometer.core.instrument.Counter; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -42,6 +43,11 @@ import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; +import static org.opensearch.dataprepper.plugins.source.rds.export.ExportScheduler.DEFAULT_CLOSE_DURATION; +import static org.opensearch.dataprepper.plugins.source.rds.export.ExportScheduler.DEFAULT_MAX_CLOSE_COUNT; +import static org.opensearch.dataprepper.plugins.source.rds.export.ExportScheduler.EXPORT_JOB_FAILURE_COUNT; +import static org.opensearch.dataprepper.plugins.source.rds.export.ExportScheduler.EXPORT_JOB_SUCCESS_COUNT; +import static org.opensearch.dataprepper.plugins.source.rds.export.ExportScheduler.EXPORT_S3_OBJECTS_TOTAL_COUNT; import static org.opensearch.dataprepper.plugins.source.rds.export.ExportScheduler.PARQUET_SUFFIX; @@ -63,6 +69,15 @@ class ExportSchedulerTest { @Mock private PluginMetrics pluginMetrics; + @Mock + private Counter exportJobSuccessCounter; + + @Mock + private Counter exportJobFailureCounter; + + @Mock + private Counter exportS3ObjectsTotalCounter; + @Mock private ExportPartition exportPartition; @@ -73,6 +88,10 @@ class ExportSchedulerTest { @BeforeEach void setUp() { + when(pluginMetrics.counter(EXPORT_JOB_SUCCESS_COUNT)).thenReturn(exportJobSuccessCounter); + when(pluginMetrics.counter(EXPORT_JOB_FAILURE_COUNT)).thenReturn(exportJobFailureCounter); + when(pluginMetrics.counter(EXPORT_S3_OBJECTS_TOTAL_COUNT)).thenReturn(exportS3ObjectsTotalCounter); + exportScheduler = createObjectUnderTest(); } @@ -87,7 +106,8 @@ void test_given_no_export_partition_then_not_export() throws InterruptedExceptio Thread.sleep(100); executorService.shutdownNow(); - verifyNoInteractions(snapshotManager, exportTaskManager, s3Client); + verifyNoInteractions(snapshotManager, exportTaskManager, s3Client, exportJobSuccessCounter, + exportJobFailureCounter, exportS3ObjectsTotalCounter); } @Test @@ -123,9 +143,11 @@ void test_given_export_partition_and_export_task_id_then_complete_export() throw any(String.class), any(String.class), any(List.class)); verify(sourceCoordinator).createPartition(any(DataFilePartition.class)); verify(sourceCoordinator).completePartition(exportPartition); + verify(exportJobSuccessCounter).increment(); + verify(exportS3ObjectsTotalCounter).increment(1); + verify(exportJobFailureCounter, never()).increment(); } - @Test void test_given_export_partition_without_export_task_id_then_start_and_complete_export() throws InterruptedException { when(sourceCoordinator.acquireAvailablePartition(ExportPartition.PARTITION_TYPE)).thenReturn(Optional.of(exportPartition)); @@ -184,6 +206,59 @@ void test_given_export_partition_without_export_task_id_then_start_and_complete_ any(String.class), any(String.class), any(List.class)); verify(sourceCoordinator).createPartition(any(DataFilePartition.class)); verify(sourceCoordinator).completePartition(exportPartition); + verify(exportJobSuccessCounter).increment(); + verify(exportS3ObjectsTotalCounter).increment(1); + verify(exportJobFailureCounter, never()).increment(); + } + + @Test + void test_given_export_partition_and_null_export_task_id_then_close_partition_with_error() throws InterruptedException { + when(sourceCoordinator.acquireAvailablePartition(ExportPartition.PARTITION_TYPE)).thenReturn(Optional.of(exportPartition)); + when(exportPartition.getPartitionKey()).thenReturn(UUID.randomUUID().toString()); + when(exportProgressState.getExportTaskId()).thenReturn(null); + when(exportPartition.getProgressState()).thenReturn(Optional.of(exportProgressState)); + final String dbIdentifier = UUID.randomUUID().toString(); + when(exportPartition.getDbIdentifier()).thenReturn(dbIdentifier); + + // Mock snapshot response + final String snapshotId = UUID.randomUUID().toString(); + final String snapshotArn = "arn:aws:rds:us-east-1:123456789012:snapshot:" + snapshotId; + final Instant createTime = Instant.now(); + final SnapshotInfo snapshotInfoWhenCreate = new SnapshotInfo( + snapshotId, snapshotArn, createTime, SnapshotStatus.CREATING.getStatusName()); + final SnapshotInfo snapshotInfoWhenComplete = new SnapshotInfo( + snapshotId, snapshotArn, createTime, SnapshotStatus.AVAILABLE.getStatusName()); + when(snapshotManager.createSnapshot(dbIdentifier)).thenReturn(snapshotInfoWhenCreate); + when(snapshotManager.checkSnapshotStatus(snapshotId)).thenReturn(snapshotInfoWhenComplete); + + // Mock export response + when(exportProgressState.getIamRoleArn()).thenReturn(UUID.randomUUID().toString()); + when(exportProgressState.getBucket()).thenReturn(UUID.randomUUID().toString()); + when(exportProgressState.getPrefix()).thenReturn(UUID.randomUUID().toString()); + when(exportProgressState.getKmsKeyId()).thenReturn(UUID.randomUUID().toString()); + when(exportTaskManager.startExportTask(any(String.class), any(String.class), any(String.class), + any(String.class), any(String.class), any(List.class))).thenReturn(null); + + // Act + final ExecutorService executorService = Executors.newSingleThreadExecutor(); + executorService.submit(exportScheduler); + await().atMost(Duration.ofSeconds(1)) + .untilAsserted(() -> verify(sourceCoordinator).acquireAvailablePartition(ExportPartition.PARTITION_TYPE)); + Thread.sleep(200); + executorService.shutdownNow(); + + // Assert + verify(snapshotManager).createSnapshot(dbIdentifier); + verify(exportTaskManager).startExportTask( + any(String.class), any(String.class), any(String.class), + any(String.class), any(String.class), any(List.class)); + verify(sourceCoordinator).closePartition(exportPartition, DEFAULT_CLOSE_DURATION, DEFAULT_MAX_CLOSE_COUNT); + verify(sourceCoordinator, never()).createPartition(any(DataFilePartition.class)); + verify(sourceCoordinator, never()).completePartition(exportPartition); + + verify(exportJobFailureCounter).increment(); + verify(exportJobSuccessCounter, never()).increment(); + verify(exportS3ObjectsTotalCounter, never()).increment(1); } @Test @@ -193,7 +268,8 @@ void test_shutDown() { final ExecutorService executorService = Executors.newSingleThreadExecutor(); executorService.submit(exportScheduler); exportScheduler.shutdown(); - verifyNoMoreInteractions(sourceCoordinator, snapshotManager, exportTaskManager, s3Client); + verifyNoMoreInteractions(sourceCoordinator, snapshotManager, exportTaskManager, s3Client, + exportJobSuccessCounter, exportJobFailureCounter, exportS3ObjectsTotalCounter); executorService.shutdownNow(); } diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogEventListenerTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogEventListenerTest.java index 406a89cec9..30f622c5d7 100644 --- a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogEventListenerTest.java +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogEventListenerTest.java @@ -14,6 +14,7 @@ import org.mockito.Answers; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.record.Record; @@ -33,6 +34,9 @@ class BinlogEventListenerTest { @Mock(answer = Answers.RETURNS_DEEP_STUBS) private RdsSourceConfig sourceConfig; + @Mock + private PluginMetrics pluginMetrics; + @Mock(answer = Answers.RETURNS_DEEP_STUBS) private com.github.shyiko.mysql.binlog.event.Event binlogEvent; @@ -87,6 +91,6 @@ void test_given_DeleteRows_event_then_calls_correct_handler(EventType eventType) } private BinlogEventListener createObjectUnderTest() { - return new BinlogEventListener(buffer, sourceConfig); + return new BinlogEventListener(buffer, sourceConfig, pluginMetrics); } } \ No newline at end of file diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index b82aa23a4f..a4413138c9 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,6 +1,6 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-8.7-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-8.8-bin.zip networkTimeout=10000 validateDistributionUrl=true zipStoreBase=GRADLE_USER_HOME diff --git a/gradlew b/gradlew index 1aa94a4269..b740cf1339 100755 --- a/gradlew +++ b/gradlew @@ -55,7 +55,7 @@ # Darwin, MinGW, and NonStop. # # (3) This script is generated from the Groovy template -# https://github.com/gradle/gradle/blob/HEAD/subprojects/plugins/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt +# https://github.com/gradle/gradle/blob/HEAD/platforms/jvm/plugins-application/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt # within the Gradle project. # # You can find Gradle at https://github.com/gradle/gradle/.