Skip to content

Commit

Permalink
Reapply "Run tests on the current JVM for Java 17 & 21 / Gradle 8.8 (o…
Browse files Browse the repository at this point in the history
…pensearch-project#4730)" (opensearch-project#4762) (opensearch-project#4771)

This reverts commit 5c7d58c.

Signed-off-by: David Venable <dlv@amazon.com>
Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
  • Loading branch information
dlvenable authored and Krishna Kondaka committed Aug 12, 2024
1 parent 30f42b0 commit fc756c9
Show file tree
Hide file tree
Showing 12 changed files with 72 additions and 50 deletions.
3 changes: 3 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,9 @@ subprojects {

test {
useJUnitPlatform()
javaLauncher = javaToolchains.launcherFor {
languageVersion = JavaLanguageVersion.current()
}
reports {
junitXml.required
html.required
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public abstract class AbstractSink<T extends Record<?>> implements Sink<T> {
private Thread retryThread;
private int maxRetries;
private int waitTimeMs;
private SinkThread sinkThread;

public AbstractSink(final PluginSetting pluginSetting, int numRetries, int waitTimeMs) {
this.pluginMetrics = PluginMetrics.fromPluginSetting(pluginSetting);
Expand All @@ -51,7 +52,8 @@ public void initialize() {
// the exceptions which are not retryable.
doInitialize();
if (!isReady() && retryThread == null) {
retryThread = new Thread(new SinkThread(this, maxRetries, waitTimeMs));
sinkThread = new SinkThread(this, maxRetries, waitTimeMs);
retryThread = new Thread(sinkThread);
retryThread.start();
}
}
Expand All @@ -76,7 +78,7 @@ public void output(Collection<T> records) {
@Override
public void shutdown() {
if (retryThread != null) {
retryThread.stop();
sinkThread.stop();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ class SinkThread implements Runnable {
private int maxRetries;
private int waitTimeMs;

private volatile boolean isStopped = false;

public SinkThread(AbstractSink sink, int maxRetries, int waitTimeMs) {
this.sink = sink;
this.maxRetries = maxRetries;
Expand All @@ -19,11 +21,15 @@ public SinkThread(AbstractSink sink, int maxRetries, int waitTimeMs) {
@Override
public void run() {
int numRetries = 0;
while (!sink.isReady() && numRetries++ < maxRetries) {
while (!sink.isReady() && numRetries++ < maxRetries && !isStopped) {
try {
Thread.sleep(waitTimeMs);
sink.doInitialize();
} catch (InterruptedException e){}
}
}

public void stop() {
isStopped = true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,10 @@
import org.opensearch.dataprepper.metrics.MetricNames;
import org.opensearch.dataprepper.metrics.MetricsTestUtil;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.event.EventHandle;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.mock;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.record.Record;

import java.time.Duration;
import java.util.Arrays;
Expand All @@ -30,6 +25,12 @@
import java.util.UUID;

import static org.awaitility.Awaitility.await;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

class AbstractSinkTest {
private int count;
Expand Down Expand Up @@ -71,13 +72,13 @@ void testMetrics() {
}

@Test
void testSinkNotReady() {
void testSinkNotReady() throws InterruptedException {
final String sinkName = "testSink";
final String pipelineName = "pipelineName";
MetricsTestUtil.initMetrics();
PluginSetting pluginSetting = new PluginSetting(sinkName, Collections.emptyMap());
pluginSetting.setPipelineName(pipelineName);
AbstractSink<Record<String>> abstractSink = new AbstractSinkNotReadyImpl(pluginSetting);
AbstractSinkNotReadyImpl abstractSink = new AbstractSinkNotReadyImpl(pluginSetting);
abstractSink.initialize();
assertEquals(abstractSink.isReady(), false);
assertEquals(abstractSink.getRetryThreadState(), Thread.State.RUNNABLE);
Expand All @@ -87,7 +88,10 @@ void testSinkNotReady() {
await().atMost(Duration.ofSeconds(5))
.until(abstractSink::isReady);
assertEquals(abstractSink.getRetryThreadState(), Thread.State.TERMINATED);
int initCountBeforeShutdown = abstractSink.initCount;
abstractSink.shutdown();
Thread.sleep(200);
assertThat(abstractSink.initCount, equalTo(initCountBeforeShutdown));
}

@Test
Expand Down
3 changes: 0 additions & 3 deletions data-prepper-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ dependencies {
exclude group: 'commons-logging', module: 'commons-logging'
}
implementation 'software.amazon.cloudwatchlogs:aws-embedded-metrics:2.0.0-beta-1'
testImplementation 'org.apache.logging.log4j:log4j-jpl:2.23.0'
testImplementation testLibs.spring.test
implementation libs.armeria.core
implementation libs.armeria.grpc
Expand Down Expand Up @@ -89,8 +88,6 @@ task integrationTest(type: Test) {

classpath = sourceSets.integrationTest.runtimeClasspath

systemProperty 'log4j.configurationFile', 'src/test/resources/log4j2.properties'

filter {
includeTestsMatching '*IT'
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Timer;
import java.util.UUID;
import java.util.stream.Stream;

Expand Down Expand Up @@ -218,7 +218,7 @@ static class SomeUnknownTypesArgumentsProvider implements ArgumentsProvider {
@Override
public Stream<? extends Arguments> provideArguments(ExtensionContext context) {
return Stream.of(
arguments(Random.class),
arguments(Timer.class),
arguments(InputStream.class),
arguments(File.class)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ public Stream<? extends Arguments> provideArguments(final ExtensionContext conte
return Stream.of(
Arguments.of(0, randomInt + 1, 0.0),
Arguments.of(1, 100, 1.0),
Arguments.of(randomInt, randomInt, 100.0),
Arguments.of(randomInt + 1, randomInt + 1, 100.0),
Arguments.of(randomInt, randomInt + 250, ((double) randomInt / (randomInt + 250)) * 100),
Arguments.of(6, 9, 66.66666666666666),
Arguments.of(531, 1000, 53.1),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,12 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import static org.mockito.Mockito.when;
import static org.mockito.Mockito.mock;

import org.mockito.Mock;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.CoreMatchers.not;
Expand All @@ -28,6 +31,7 @@
import java.io.ByteArrayInputStream;

import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.LinkedList;
import java.util.Map;
Expand Down Expand Up @@ -56,7 +60,7 @@ public EventJsonInputCodec createInputCodec() {
@ParameterizedTest
@ValueSource(strings = {"", "{}"})
public void emptyTest(String input) throws Exception {
input = "{\""+EventJsonDefines.VERSION+"\":\""+DataPrepperVersion.getCurrentVersion().toString()+"\", \""+EventJsonDefines.EVENTS+"\":["+input+"]}";
input = "{\"" + EventJsonDefines.VERSION + "\":\"" + DataPrepperVersion.getCurrentVersion().toString() + "\", \"" + EventJsonDefines.EVENTS + "\":[" + input + "]}";
ByteArrayInputStream inputStream = new ByteArrayInputStream(input.getBytes());
inputCodec = createInputCodec();
Consumer<Record<Event>> consumer = mock(Consumer.class);
Expand All @@ -70,15 +74,15 @@ public void inCompatibleVersionTest() throws Exception {
final String key = UUID.randomUUID().toString();
final String value = UUID.randomUUID().toString();
Map<String, Object> data = Map.of(key, value);
Instant startTime = Instant.now();
Instant startTime = Instant.now().truncatedTo(ChronoUnit.MICROS);
Event event = createEvent(data, startTime);

Map<String, Object> dataMap = event.toMap();
Map<String, Object> metadataMap = objectMapper.convertValue(event.getMetadata(), Map.class);
String input = "{\""+EventJsonDefines.VERSION+"\":\"3.0\", \""+EventJsonDefines.EVENTS+"\":[";
String input = "{\"" + EventJsonDefines.VERSION + "\":\"3.0\", \"" + EventJsonDefines.EVENTS + "\":[";
String comma = "";
for (int i = 0; i < 2; i++) {
input += comma+"{\"data\":"+objectMapper.writeValueAsString(dataMap)+","+"\"metadata\":"+objectMapper.writeValueAsString(metadataMap)+"}";
input += comma + "{\"data\":" + objectMapper.writeValueAsString(dataMap) + "," + "\"metadata\":" + objectMapper.writeValueAsString(metadataMap) + "}";
comma = ",";
}
input += "]}";
Expand All @@ -95,24 +99,24 @@ public void basicTest() throws Exception {
final String key = UUID.randomUUID().toString();
final String value = UUID.randomUUID().toString();
Map<String, Object> data = Map.of(key, value);
Instant startTime = Instant.now();
Instant startTime = Instant.now().truncatedTo(ChronoUnit.MICROS);
Event event = createEvent(data, startTime);

Map<String, Object> dataMap = event.toMap();
Map<String, Object> metadataMap = objectMapper.convertValue(event.getMetadata(), Map.class);
String input = "{\""+EventJsonDefines.VERSION+"\":\""+DataPrepperVersion.getCurrentVersion().toString()+"\", \""+EventJsonDefines.EVENTS+"\":[";
String input = "{\"" + EventJsonDefines.VERSION + "\":\"" + DataPrepperVersion.getCurrentVersion().toString() + "\", \"" + EventJsonDefines.EVENTS + "\":[";
String comma = "";
for (int i = 0; i < 2; i++) {
input += comma+"{\"data\":"+objectMapper.writeValueAsString(dataMap)+","+"\"metadata\":"+objectMapper.writeValueAsString(metadataMap)+"}";
input += comma + "{\"data\":" + objectMapper.writeValueAsString(dataMap) + "," + "\"metadata\":" + objectMapper.writeValueAsString(metadataMap) + "}";
comma = ",";
}
input += "]}";
inputStream = new ByteArrayInputStream(input.getBytes());
List<Record<Event>> records = new LinkedList<>();
inputCodec.parse(inputStream, records::add);
assertThat(records.size(), equalTo(2));
for(Record record : records) {
Event e = (Event)record.getData();
for (Record record : records) {
Event e = (Event) record.getData();
assertThat(e.get(key, String.class), equalTo(value));
assertThat(e.getMetadata().getTimeReceived(), equalTo(startTime));
assertThat(e.getMetadata().getTags().size(), equalTo(0));
Expand All @@ -126,24 +130,24 @@ public void test_with_timeReceivedOverridden() throws Exception {
final String key = UUID.randomUUID().toString();
final String value = UUID.randomUUID().toString();
Map<String, Object> data = Map.of(key, value);
Instant startTime = Instant.now().minusSeconds(5);
Instant startTime = Instant.now().truncatedTo(ChronoUnit.MICROS).minusSeconds(5);
Event event = createEvent(data, startTime);

Map<String, Object> dataMap = event.toMap();
Map<String, Object> metadataMap = objectMapper.convertValue(event.getMetadata(), Map.class);
String input = "{\""+EventJsonDefines.VERSION+"\":\""+DataPrepperVersion.getCurrentVersion().toString()+"\", \""+EventJsonDefines.EVENTS+"\":[";
String input = "{\"" + EventJsonDefines.VERSION + "\":\"" + DataPrepperVersion.getCurrentVersion().toString() + "\", \"" + EventJsonDefines.EVENTS + "\":[";
String comma = "";
for (int i = 0; i < 2; i++) {
input += comma+"{\"data\":"+objectMapper.writeValueAsString(dataMap)+","+"\"metadata\":"+objectMapper.writeValueAsString(metadataMap)+"}";
input += comma + "{\"data\":" + objectMapper.writeValueAsString(dataMap) + "," + "\"metadata\":" + objectMapper.writeValueAsString(metadataMap) + "}";
comma = ",";
}
input += "]}";
inputStream = new ByteArrayInputStream(input.getBytes());
List<Record<Event>> records = new LinkedList<>();
inputCodec.parse(inputStream, records::add);
assertThat(records.size(), equalTo(2));
for(Record record : records) {
Event e = (Event)record.getData();
for (Record record : records) {
Event e = (Event) record.getData();
assertThat(e.get(key, String.class), equalTo(value));
assertThat(e.getMetadata().getTimeReceived(), not(equalTo(startTime)));
assertThat(e.getMetadata().getTags().size(), equalTo(0));
Expand All @@ -159,7 +163,7 @@ private Event createEvent(final Map<String, Object> json, final Instant timeRece
if (timeReceived != null) {
logBuilder.withTimeReceived(timeReceived);
}
final JacksonEvent event = (JacksonEvent)logBuilder.build();
final JacksonEvent event = (JacksonEvent) logBuilder.build();

return event;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import static org.mockito.Mockito.when;
import static org.mockito.Mockito.mock;

import org.mockito.Mock;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;

Expand All @@ -22,6 +25,7 @@
import org.opensearch.dataprepper.model.log.JacksonLog;

import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.LinkedList;
import java.util.Map;
Expand Down Expand Up @@ -64,7 +68,7 @@ public void basicTest() throws Exception {
final String value = UUID.randomUUID().toString();
Map<String, Object> data = Map.of(key, value);

Instant startTime = Instant.now();
Instant startTime = Instant.now().truncatedTo(ChronoUnit.MICROS);
Event event = createEvent(data, startTime);
outputCodec = createOutputCodec();
inputCodec = createInputCodec();
Expand All @@ -75,8 +79,8 @@ public void basicTest() throws Exception {
inputCodec.parse(new ByteArrayInputStream(outputStream.toByteArray()), records::add);

assertThat(records.size(), equalTo(1));
for(Record record : records) {
Event e = (Event)record.getData();
for (Record record : records) {
Event e = (Event) record.getData();
assertThat(e.get(key, String.class), equalTo(value));
assertThat(e.getMetadata().getTimeReceived(), equalTo(startTime));
assertThat(e.getMetadata().getTags().size(), equalTo(0));
Expand All @@ -90,7 +94,7 @@ public void multipleEventsTest() throws Exception {
final String value = UUID.randomUUID().toString();
Map<String, Object> data = Map.of(key, value);

Instant startTime = Instant.now();
Instant startTime = Instant.now().truncatedTo(ChronoUnit.MICROS);
Event event = createEvent(data, startTime);
outputCodec = createOutputCodec();
inputCodec = createInputCodec();
Expand All @@ -103,8 +107,8 @@ public void multipleEventsTest() throws Exception {
inputCodec.parse(new ByteArrayInputStream(outputStream.toByteArray()), records::add);

assertThat(records.size(), equalTo(3));
for(Record record : records) {
Event e = (Event)record.getData();
for (Record record : records) {
Event e = (Event) record.getData();
assertThat(e.get(key, String.class), equalTo(value));
assertThat(e.getMetadata().getTimeReceived(), equalTo(startTime));
assertThat(e.getMetadata().getTags().size(), equalTo(0));
Expand All @@ -122,7 +126,7 @@ public void extendedTest() throws Exception {

Set<String> tags = Set.of(UUID.randomUUID().toString(), UUID.randomUUID().toString());
List<String> tagsList = tags.stream().collect(Collectors.toList());
Instant startTime = Instant.now();
Instant startTime = Instant.now().truncatedTo(ChronoUnit.MICROS);
Event event = createEvent(data, startTime);
Instant origTime = startTime.minusSeconds(5);
event.getMetadata().setExternalOriginationTime(origTime);
Expand All @@ -135,11 +139,11 @@ public void extendedTest() throws Exception {
outputCodec.complete(outputStream);
assertThat(outputCodec.getExtension(), equalTo(EventJsonOutputCodec.EVENT_JSON));
List<Record<Event>> records = new LinkedList<>();
inputCodec.parse(new ByteArrayInputStream(outputStream.toByteArray()), records::add);
inputCodec.parse(new ByteArrayInputStream(outputStream.toByteArray()), records::add);

assertThat(records.size(), equalTo(1));
for(Record record : records) {
Event e = (Event)record.getData();
for (Record record : records) {
Event e = (Event) record.getData();
assertThat(e.get(key, String.class), equalTo(value));
assertThat(e.getMetadata().getTimeReceived(), equalTo(startTime));
assertThat(e.getMetadata().getTags(), equalTo(tags));
Expand All @@ -157,7 +161,7 @@ private Event createEvent(final Map<String, Object> json, final Instant timeRece
if (timeReceived != null) {
logBuilder.withTimeReceived(timeReceived);
}
final JacksonEvent event = (JacksonEvent)logBuilder.build();
final JacksonEvent event = (JacksonEvent) logBuilder.build();

return event;
}
Expand Down
Loading

0 comments on commit fc756c9

Please sign in to comment.