Skip to content

Commit

Permalink
Revert "Run tests on the current JVM for Java 17 & 21 / Gradle 8.8 (#…
Browse files Browse the repository at this point in the history
…4730)"

This reverts commit 67f3595.

Signed-off-by: Hai Yan <oeyh@amazon.com>
  • Loading branch information
oeyh committed Jul 23, 2024
1 parent 427bb78 commit 51be01b
Show file tree
Hide file tree
Showing 12 changed files with 50 additions and 72 deletions.
3 changes: 0 additions & 3 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -226,9 +226,6 @@ subprojects {

test {
useJUnitPlatform()
javaLauncher = javaToolchains.launcherFor {
languageVersion = JavaLanguageVersion.current()
}
reports {
junitXml.required
html.required
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ public abstract class AbstractSink<T extends Record<?>> implements Sink<T> {
private Thread retryThread;
private int maxRetries;
private int waitTimeMs;
private SinkThread sinkThread;

public AbstractSink(final PluginSetting pluginSetting, int numRetries, int waitTimeMs) {
this.pluginMetrics = PluginMetrics.fromPluginSetting(pluginSetting);
Expand All @@ -52,8 +51,7 @@ public void initialize() {
// the exceptions which are not retryable.
doInitialize();
if (!isReady() && retryThread == null) {
sinkThread = new SinkThread(this, maxRetries, waitTimeMs);
retryThread = new Thread(sinkThread);
retryThread = new Thread(new SinkThread(this, maxRetries, waitTimeMs));
retryThread.start();
}
}
Expand All @@ -78,7 +76,7 @@ public void output(Collection<T> records) {
@Override
public void shutdown() {
if (retryThread != null) {
sinkThread.stop();
retryThread.stop();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ class SinkThread implements Runnable {
private int maxRetries;
private int waitTimeMs;

private volatile boolean isStopped = false;

public SinkThread(AbstractSink sink, int maxRetries, int waitTimeMs) {
this.sink = sink;
this.maxRetries = maxRetries;
Expand All @@ -21,15 +19,11 @@ public SinkThread(AbstractSink sink, int maxRetries, int waitTimeMs) {
@Override
public void run() {
int numRetries = 0;
while (!sink.isReady() && numRetries++ < maxRetries && !isStopped) {
while (!sink.isReady() && numRetries++ < maxRetries) {
try {
Thread.sleep(waitTimeMs);
sink.doInitialize();
} catch (InterruptedException e){}
}
}

public void stop() {
isStopped = true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,15 @@
import org.opensearch.dataprepper.metrics.MetricNames;
import org.opensearch.dataprepper.metrics.MetricsTestUtil;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventHandle;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.event.EventHandle;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.mock;

import java.time.Duration;
import java.util.Arrays;
Expand All @@ -25,12 +30,6 @@
import java.util.UUID;

import static org.awaitility.Awaitility.await;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

class AbstractSinkTest {
private int count;
Expand Down Expand Up @@ -72,13 +71,13 @@ void testMetrics() {
}

@Test
void testSinkNotReady() throws InterruptedException {
void testSinkNotReady() {
final String sinkName = "testSink";
final String pipelineName = "pipelineName";
MetricsTestUtil.initMetrics();
PluginSetting pluginSetting = new PluginSetting(sinkName, Collections.emptyMap());
pluginSetting.setPipelineName(pipelineName);
AbstractSinkNotReadyImpl abstractSink = new AbstractSinkNotReadyImpl(pluginSetting);
AbstractSink<Record<String>> abstractSink = new AbstractSinkNotReadyImpl(pluginSetting);
abstractSink.initialize();
assertEquals(abstractSink.isReady(), false);
assertEquals(abstractSink.getRetryThreadState(), Thread.State.RUNNABLE);
Expand All @@ -88,10 +87,7 @@ void testSinkNotReady() throws InterruptedException {
await().atMost(Duration.ofSeconds(5))
.until(abstractSink::isReady);
assertEquals(abstractSink.getRetryThreadState(), Thread.State.TERMINATED);
int initCountBeforeShutdown = abstractSink.initCount;
abstractSink.shutdown();
Thread.sleep(200);
assertThat(abstractSink.initCount, equalTo(initCountBeforeShutdown));
}

@Test
Expand Down
3 changes: 3 additions & 0 deletions data-prepper-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ dependencies {
exclude group: 'commons-logging', module: 'commons-logging'
}
implementation 'software.amazon.cloudwatchlogs:aws-embedded-metrics:2.0.0-beta-1'
testImplementation 'org.apache.logging.log4j:log4j-jpl:2.23.0'
testImplementation testLibs.spring.test
implementation libs.armeria.core
implementation libs.armeria.grpc
Expand Down Expand Up @@ -88,6 +89,8 @@ task integrationTest(type: Test) {

classpath = sourceSets.integrationTest.runtimeClasspath

systemProperty 'log4j.configurationFile', 'src/test/resources/log4j2.properties'

filter {
includeTestsMatching '*IT'
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.Random;
import java.util.UUID;
import java.util.stream.Stream;

Expand Down Expand Up @@ -218,7 +218,7 @@ static class SomeUnknownTypesArgumentsProvider implements ArgumentsProvider {
@Override
public Stream<? extends Arguments> provideArguments(ExtensionContext context) {
return Stream.of(
arguments(Timer.class),
arguments(Random.class),
arguments(InputStream.class),
arguments(File.class)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ public Stream<? extends Arguments> provideArguments(final ExtensionContext conte
return Stream.of(
Arguments.of(0, randomInt + 1, 0.0),
Arguments.of(1, 100, 1.0),
Arguments.of(randomInt + 1, randomInt + 1, 100.0),
Arguments.of(randomInt, randomInt, 100.0),
Arguments.of(randomInt, randomInt + 250, ((double) randomInt / (randomInt + 250)) * 100),
Arguments.of(6, 9, 66.66666666666666),
Arguments.of(531, 1000, 53.1),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,9 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import static org.mockito.Mockito.when;
import static org.mockito.Mockito.mock;

import org.mockito.Mock;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.CoreMatchers.not;
Expand All @@ -31,7 +28,6 @@
import java.io.ByteArrayInputStream;

import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.LinkedList;
import java.util.Map;
Expand Down Expand Up @@ -60,7 +56,7 @@ public EventJsonInputCodec createInputCodec() {
@ParameterizedTest
@ValueSource(strings = {"", "{}"})
public void emptyTest(String input) throws Exception {
input = "{\"" + EventJsonDefines.VERSION + "\":\"" + DataPrepperVersion.getCurrentVersion().toString() + "\", \"" + EventJsonDefines.EVENTS + "\":[" + input + "]}";
input = "{\""+EventJsonDefines.VERSION+"\":\""+DataPrepperVersion.getCurrentVersion().toString()+"\", \""+EventJsonDefines.EVENTS+"\":["+input+"]}";
ByteArrayInputStream inputStream = new ByteArrayInputStream(input.getBytes());
inputCodec = createInputCodec();
Consumer<Record<Event>> consumer = mock(Consumer.class);
Expand All @@ -74,15 +70,15 @@ public void inCompatibleVersionTest() throws Exception {
final String key = UUID.randomUUID().toString();
final String value = UUID.randomUUID().toString();
Map<String, Object> data = Map.of(key, value);
Instant startTime = Instant.now().truncatedTo(ChronoUnit.MICROS);
Instant startTime = Instant.now();
Event event = createEvent(data, startTime);

Map<String, Object> dataMap = event.toMap();
Map<String, Object> metadataMap = objectMapper.convertValue(event.getMetadata(), Map.class);
String input = "{\"" + EventJsonDefines.VERSION + "\":\"3.0\", \"" + EventJsonDefines.EVENTS + "\":[";
String input = "{\""+EventJsonDefines.VERSION+"\":\"3.0\", \""+EventJsonDefines.EVENTS+"\":[";
String comma = "";
for (int i = 0; i < 2; i++) {
input += comma + "{\"data\":" + objectMapper.writeValueAsString(dataMap) + "," + "\"metadata\":" + objectMapper.writeValueAsString(metadataMap) + "}";
input += comma+"{\"data\":"+objectMapper.writeValueAsString(dataMap)+","+"\"metadata\":"+objectMapper.writeValueAsString(metadataMap)+"}";
comma = ",";
}
input += "]}";
Expand All @@ -99,24 +95,24 @@ public void basicTest() throws Exception {
final String key = UUID.randomUUID().toString();
final String value = UUID.randomUUID().toString();
Map<String, Object> data = Map.of(key, value);
Instant startTime = Instant.now().truncatedTo(ChronoUnit.MICROS);
Instant startTime = Instant.now();
Event event = createEvent(data, startTime);

Map<String, Object> dataMap = event.toMap();
Map<String, Object> metadataMap = objectMapper.convertValue(event.getMetadata(), Map.class);
String input = "{\"" + EventJsonDefines.VERSION + "\":\"" + DataPrepperVersion.getCurrentVersion().toString() + "\", \"" + EventJsonDefines.EVENTS + "\":[";
String input = "{\""+EventJsonDefines.VERSION+"\":\""+DataPrepperVersion.getCurrentVersion().toString()+"\", \""+EventJsonDefines.EVENTS+"\":[";
String comma = "";
for (int i = 0; i < 2; i++) {
input += comma + "{\"data\":" + objectMapper.writeValueAsString(dataMap) + "," + "\"metadata\":" + objectMapper.writeValueAsString(metadataMap) + "}";
input += comma+"{\"data\":"+objectMapper.writeValueAsString(dataMap)+","+"\"metadata\":"+objectMapper.writeValueAsString(metadataMap)+"}";
comma = ",";
}
input += "]}";
inputStream = new ByteArrayInputStream(input.getBytes());
List<Record<Event>> records = new LinkedList<>();
inputCodec.parse(inputStream, records::add);
assertThat(records.size(), equalTo(2));
for (Record record : records) {
Event e = (Event) record.getData();
for(Record record : records) {
Event e = (Event)record.getData();
assertThat(e.get(key, String.class), equalTo(value));
assertThat(e.getMetadata().getTimeReceived(), equalTo(startTime));
assertThat(e.getMetadata().getTags().size(), equalTo(0));
Expand All @@ -130,24 +126,24 @@ public void test_with_timeReceivedOverridden() throws Exception {
final String key = UUID.randomUUID().toString();
final String value = UUID.randomUUID().toString();
Map<String, Object> data = Map.of(key, value);
Instant startTime = Instant.now().truncatedTo(ChronoUnit.MICROS).minusSeconds(5);
Instant startTime = Instant.now().minusSeconds(5);
Event event = createEvent(data, startTime);

Map<String, Object> dataMap = event.toMap();
Map<String, Object> metadataMap = objectMapper.convertValue(event.getMetadata(), Map.class);
String input = "{\"" + EventJsonDefines.VERSION + "\":\"" + DataPrepperVersion.getCurrentVersion().toString() + "\", \"" + EventJsonDefines.EVENTS + "\":[";
String input = "{\""+EventJsonDefines.VERSION+"\":\""+DataPrepperVersion.getCurrentVersion().toString()+"\", \""+EventJsonDefines.EVENTS+"\":[";
String comma = "";
for (int i = 0; i < 2; i++) {
input += comma + "{\"data\":" + objectMapper.writeValueAsString(dataMap) + "," + "\"metadata\":" + objectMapper.writeValueAsString(metadataMap) + "}";
input += comma+"{\"data\":"+objectMapper.writeValueAsString(dataMap)+","+"\"metadata\":"+objectMapper.writeValueAsString(metadataMap)+"}";
comma = ",";
}
input += "]}";
inputStream = new ByteArrayInputStream(input.getBytes());
List<Record<Event>> records = new LinkedList<>();
inputCodec.parse(inputStream, records::add);
assertThat(records.size(), equalTo(2));
for (Record record : records) {
Event e = (Event) record.getData();
for(Record record : records) {
Event e = (Event)record.getData();
assertThat(e.get(key, String.class), equalTo(value));
assertThat(e.getMetadata().getTimeReceived(), not(equalTo(startTime)));
assertThat(e.getMetadata().getTags().size(), equalTo(0));
Expand All @@ -163,7 +159,7 @@ private Event createEvent(final Map<String, Object> json, final Instant timeRece
if (timeReceived != null) {
logBuilder.withTimeReceived(timeReceived);
}
final JacksonEvent event = (JacksonEvent) logBuilder.build();
final JacksonEvent event = (JacksonEvent)logBuilder.build();

return event;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,9 @@

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import static org.mockito.Mockito.when;
import static org.mockito.Mockito.mock;

import org.mockito.Mock;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;

Expand All @@ -25,7 +22,6 @@
import org.opensearch.dataprepper.model.log.JacksonLog;

import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.LinkedList;
import java.util.Map;
Expand Down Expand Up @@ -68,7 +64,7 @@ public void basicTest() throws Exception {
final String value = UUID.randomUUID().toString();
Map<String, Object> data = Map.of(key, value);

Instant startTime = Instant.now().truncatedTo(ChronoUnit.MICROS);
Instant startTime = Instant.now();
Event event = createEvent(data, startTime);
outputCodec = createOutputCodec();
inputCodec = createInputCodec();
Expand All @@ -79,8 +75,8 @@ public void basicTest() throws Exception {
inputCodec.parse(new ByteArrayInputStream(outputStream.toByteArray()), records::add);

assertThat(records.size(), equalTo(1));
for (Record record : records) {
Event e = (Event) record.getData();
for(Record record : records) {
Event e = (Event)record.getData();
assertThat(e.get(key, String.class), equalTo(value));
assertThat(e.getMetadata().getTimeReceived(), equalTo(startTime));
assertThat(e.getMetadata().getTags().size(), equalTo(0));
Expand All @@ -94,7 +90,7 @@ public void multipleEventsTest() throws Exception {
final String value = UUID.randomUUID().toString();
Map<String, Object> data = Map.of(key, value);

Instant startTime = Instant.now().truncatedTo(ChronoUnit.MICROS);
Instant startTime = Instant.now();
Event event = createEvent(data, startTime);
outputCodec = createOutputCodec();
inputCodec = createInputCodec();
Expand All @@ -107,8 +103,8 @@ public void multipleEventsTest() throws Exception {
inputCodec.parse(new ByteArrayInputStream(outputStream.toByteArray()), records::add);

assertThat(records.size(), equalTo(3));
for (Record record : records) {
Event e = (Event) record.getData();
for(Record record : records) {
Event e = (Event)record.getData();
assertThat(e.get(key, String.class), equalTo(value));
assertThat(e.getMetadata().getTimeReceived(), equalTo(startTime));
assertThat(e.getMetadata().getTags().size(), equalTo(0));
Expand All @@ -126,7 +122,7 @@ public void extendedTest() throws Exception {

Set<String> tags = Set.of(UUID.randomUUID().toString(), UUID.randomUUID().toString());
List<String> tagsList = tags.stream().collect(Collectors.toList());
Instant startTime = Instant.now().truncatedTo(ChronoUnit.MICROS);
Instant startTime = Instant.now();
Event event = createEvent(data, startTime);
Instant origTime = startTime.minusSeconds(5);
event.getMetadata().setExternalOriginationTime(origTime);
Expand All @@ -139,11 +135,11 @@ public void extendedTest() throws Exception {
outputCodec.complete(outputStream);
assertThat(outputCodec.getExtension(), equalTo(EventJsonOutputCodec.EVENT_JSON));
List<Record<Event>> records = new LinkedList<>();
inputCodec.parse(new ByteArrayInputStream(outputStream.toByteArray()), records::add);
inputCodec.parse(new ByteArrayInputStream(outputStream.toByteArray()), records::add);

assertThat(records.size(), equalTo(1));
for (Record record : records) {
Event e = (Event) record.getData();
for(Record record : records) {
Event e = (Event)record.getData();
assertThat(e.get(key, String.class), equalTo(value));
assertThat(e.getMetadata().getTimeReceived(), equalTo(startTime));
assertThat(e.getMetadata().getTags(), equalTo(tags));
Expand All @@ -161,7 +157,7 @@ private Event createEvent(final Map<String, Object> json, final Instant timeRece
if (timeReceived != null) {
logBuilder.withTimeReceived(timeReceived);
}
final JacksonEvent event = (JacksonEvent) logBuilder.build();
final JacksonEvent event = (JacksonEvent)logBuilder.build();

return event;
}
Expand Down
Loading

0 comments on commit 51be01b

Please sign in to comment.