Skip to content

Commit

Permalink
Using Awaitility and mocks in the LogGeneratorSourceTest to attempt t…
Browse files Browse the repository at this point in the history
…o improve reliability. (#4746)

Signed-off-by: David Venable <dlv@amazon.com>
  • Loading branch information
dlvenable committed Aug 2, 2024
1 parent ecc3bfb commit e22e969
Showing 1 changed file with 32 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,36 +6,37 @@
package org.opensearch.dataprepper.plugins.source.loggenerator;


import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.configuration.PluginModel;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.model.record.Record;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.plugins.buffer.blockingbuffer.BlockingBuffer;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.awaitility.Awaitility.await;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;
import static org.opensearch.dataprepper.plugins.source.loggenerator.LogGeneratorSourceConfig.INFINITE_LOG_COUNT;

Expand All @@ -50,6 +51,9 @@ public class LogGeneratorSourceTest {
private LogGeneratorSource logGeneratorSource;
@Mock
private PluginModel mockLogPluginModel;
@Mock
private Buffer buffer;

@BeforeEach
public void setup() {
when(sourceConfig.getLogType()).thenReturn(mockLogPluginModel);
Expand Down Expand Up @@ -79,42 +83,45 @@ private LogGeneratorSource createObjectUnderTest() {

@Test
void GIVEN_logGeneratorSourceAndBlockingBuffer_WHEN_noLimit_THEN_keepsWritingToBufferUntilStopped()
throws InterruptedException, TimeoutException {
throws TimeoutException {
logGeneratorSource = createObjectUnderTest();

BlockingBuffer<Record<Event>> spyBuffer = spy(new BlockingBuffer<Record<Event>>("SamplePipeline"));

Duration interval = Duration.ofMillis(100);

lenient().when(sourceConfig.getInterval()).thenReturn(interval);
lenient().when(sourceConfig.getCount()).thenReturn(INFINITE_LOG_COUNT); // no limit to log count

logGeneratorSource.start(spyBuffer);
Thread.sleep((long) (interval.toMillis() * 1.5));
verify(spyBuffer, atLeast(1)).write(any(Record.class), anyInt());
Thread.sleep((long) (interval.toMillis() * 0.7));
verify(spyBuffer, atLeast(2)).write(any(Record.class), anyInt());
logGeneratorSource.start(buffer);
await()
.atMost((long) (interval.toMillis() * 1.5), TimeUnit.MILLISECONDS)
.untilAsserted(() -> verify(buffer, atLeast(1)).write(any(Record.class), anyInt()));
verify(buffer, atLeast(1)).write(any(Record.class), anyInt());
await()
.atMost((long) (interval.toMillis() * 1.5), TimeUnit.MILLISECONDS)
.untilAsserted(() -> verify(buffer, atLeast(1)).write(any(Record.class), anyInt()));
verify(buffer, atLeast(2)).write(any(Record.class), anyInt());
}

@Test
void GIVEN_logGeneratorSourceAndBlockingBuffer_WHEN_reachedLimit_THEN_stopsWritingToBuffer()
throws InterruptedException, TimeoutException {
logGeneratorSource = createObjectUnderTest();

BlockingBuffer<Record<Event>> spyBuffer = spy(new BlockingBuffer<Record<Event>>("SamplePipeline"));

Duration interval = Duration.ofMillis(100);

lenient().when(sourceConfig.getInterval()).thenReturn(interval);
lenient().when(sourceConfig.getCount()).thenReturn(1); // max log count of 1 in logGeneratorSource

assertEquals(spyBuffer.isEmpty(), true);
logGeneratorSource.start(spyBuffer);
Thread.sleep((long) (interval.toMillis() * 1.1));
verifyNoInteractions(buffer);

verify(spyBuffer, times(1)).write(any(Record.class), anyInt());
logGeneratorSource.start(buffer);

Thread.sleep(interval.toMillis());
verify(spyBuffer, times(1)).write(any(Record.class), anyInt());
await()
.atMost(interval.multipliedBy(3))
.untilAsserted(() -> verify(buffer, atLeast(1)).write(any(Record.class), anyInt()));
verify(buffer, times(1)).write(any(Record.class), anyInt());

Thread.sleep((long) (interval.toMillis() * 1.1));
verify(buffer, times(1)).write(any(Record.class), anyInt());
}
}

0 comments on commit e22e969

Please sign in to comment.