Skip to content

Commit

Permalink
Fixed build erros
Browse files Browse the repository at this point in the history
Signed-off-by: Kondaka <krishkdk@amazon.com>
  • Loading branch information
kkondaka committed Sep 16, 2024
1 parent e1f8fcb commit 30b46c6
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ void processSqsMessages_should_return_at_least_one_message_with_acks_with_callba
assertThat(s3ObjectReference.getBucketName(), equalTo(bucket));
assertThat(s3ObjectReference.getKey(), startsWith("s3 source/sqs/"));
event = (Event)JacksonEvent.fromMessage(val.getArgument(0).toString());
ackSet.add(event);
ackSet.add(event.getEventHandle());
return null;
}).when(s3Service).addS3Object(any(S3ObjectReference.class), any(AcknowledgementSet.class));
ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
Expand Down Expand Up @@ -246,7 +246,7 @@ void processSqsMessages_should_return_at_least_one_message_with_acks_with_callba
assertThat(s3ObjectReference.getKey(), startsWith("s3 source/sqs/"));
event = (Event)JacksonEvent.fromMessage(val.getArgument(0).toString());

ackSet.add(event);
ackSet.add(event.getEventHandle());
synchronized(this) {
ready.set(true);
this.notify();
Expand Down Expand Up @@ -325,7 +325,7 @@ void processSqsMessages_with_acks_and_progress_check_callbacks(final int numberO
assertThat(s3ObjectReference.getKey(), startsWith("s3 source/sqs/"));
event = (Event)JacksonEvent.fromMessage(val.getArgument(0).toString());

ackSet.add(event);
ackSet.add(event.getEventHandle());
synchronized(events) {
events.add(event);
}
Expand Down Expand Up @@ -400,7 +400,7 @@ void processSqsMessages_with_acks_and_progress_check_callbacks_expires(final int
assertThat(s3ObjectReference.getKey(), startsWith("s3 source/sqs/"));
event = (Event)JacksonEvent.fromMessage(val.getArgument(0).toString());

ackSet.add(event);
ackSet.add(event.getEventHandle());
synchronized(events) {
events.add(event);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.opensearch.dataprepper.model.codec.DecompressionEngine;
import org.opensearch.dataprepper.model.codec.InputCodec;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventHandle;
import org.opensearch.dataprepper.model.io.InputFile;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator;
Expand Down Expand Up @@ -99,6 +100,8 @@ class S3ObjectWorkerTest {
@Mock
private Counter s3ObjectsSucceededCounter;
@Mock
private Counter s3ObjectsNoRecordFoundCounter;
@Mock
private Counter s3ObjectNoRecordsFound;
@Mock
private Timer s3ObjectReadTimer;
Expand All @@ -123,6 +126,8 @@ class S3ObjectWorkerTest {
private Exception exceptionThrownByCallable;
private Random random;
private long objectSize;
private Event event;
private EventHandle eventHandle;
@Mock
private S3ObjectPluginMetrics s3ObjectPluginMetrics;
@Mock
Expand All @@ -133,6 +138,9 @@ class S3ObjectWorkerTest {
@BeforeEach
void setUp() throws Exception {
random = new Random();
event = mock(Event.class);
eventHandle = mock(EventHandle.class);
lenient().when(event.getEventHandle()).thenReturn(eventHandle);
bufferTimeout = Duration.ofMillis(random.nextInt(100) + 100);
recordsToAccumulate = random.nextInt(10) + 2;

Expand Down Expand Up @@ -180,6 +188,7 @@ void parseS3Object_calls_getObject_with_correct_GetObjectRequest() throws IOExce
when(s3Client.headObject(any(HeadObjectRequest.class))).thenReturn(headObjectResponse);
when(s3ObjectPluginMetrics.getS3ObjectEventsSummary()).thenReturn(s3ObjectEventsSummary);
when(s3ObjectPluginMetrics.getS3ObjectsSucceededCounter()).thenReturn(s3ObjectsSucceededCounter);
when(s3ObjectPluginMetrics.getS3ObjectNoRecordsFound()).thenReturn(s3ObjectsNoRecordFoundCounter);
when(s3ObjectPluginMetrics.getS3ObjectSizeSummary()).thenReturn(s3ObjectSizeSummary);
when(s3ObjectPluginMetrics.getS3ObjectNoRecordsFound()).thenReturn(s3ObjectNoRecordsFound);

Expand All @@ -196,6 +205,7 @@ void parseS3Object_calls_getObject_with_correct_GetObjectRequest_with_Acknowledg
numEventsAdded = 0;
doAnswer(a -> {
Record record = mock(Record.class);
when(record.getData()).thenReturn(event);
Consumer c = (Consumer)a.getArgument(2);
c.accept(record);
return null;
Expand All @@ -213,6 +223,7 @@ void parseS3Object_calls_getObject_with_correct_GetObjectRequest_when_bucketOwne
when(s3ObjectPluginMetrics.getS3ObjectEventsSummary()).thenReturn(s3ObjectEventsSummary);
when(s3ObjectPluginMetrics.getS3ObjectsSucceededCounter()).thenReturn(s3ObjectsSucceededCounter);
when(s3ObjectPluginMetrics.getS3ObjectSizeSummary()).thenReturn(s3ObjectSizeSummary);
when(s3ObjectPluginMetrics.getS3ObjectNoRecordsFound()).thenReturn(s3ObjectsNoRecordFoundCounter);
when(s3ObjectPluginMetrics.getS3ObjectNoRecordsFound()).thenReturn(s3ObjectNoRecordsFound);
createObjectUnderTest(s3ObjectPluginMetrics).parseS3Object(s3ObjectReference, acknowledgementSet, null, null);
}
Expand All @@ -223,6 +234,7 @@ void parseS3Object_calls_Codec_parse_on_S3InputStream() throws Exception {
when(s3ObjectPluginMetrics.getS3ObjectEventsSummary()).thenReturn(s3ObjectEventsSummary);
when(s3ObjectPluginMetrics.getS3ObjectsSucceededCounter()).thenReturn(s3ObjectsSucceededCounter);
when(s3ObjectPluginMetrics.getS3ObjectSizeSummary()).thenReturn(s3ObjectSizeSummary);
when(s3ObjectPluginMetrics.getS3ObjectNoRecordsFound()).thenReturn(s3ObjectsNoRecordFoundCounter);
when(s3ObjectPluginMetrics.getS3ObjectNoRecordsFound()).thenReturn(s3ObjectNoRecordsFound);

createObjectUnderTest(s3ObjectPluginMetrics).parseS3Object(s3ObjectReference, acknowledgementSet, null, null);
Expand Down Expand Up @@ -255,6 +267,7 @@ void parseS3Object_calls_Codec_parse_with_Consumer_that_adds_to_BufferAccumulato
when(s3Client.headObject(any(HeadObjectRequest.class))).thenReturn(headObjectResponse);
when(s3ObjectPluginMetrics.getS3ObjectEventsSummary()).thenReturn(s3ObjectEventsSummary);
when(s3ObjectPluginMetrics.getS3ObjectsSucceededCounter()).thenReturn(s3ObjectsSucceededCounter);
when(s3ObjectPluginMetrics.getS3ObjectNoRecordsFound()).thenReturn(s3ObjectsNoRecordFoundCounter);
when(s3ObjectPluginMetrics.getS3ObjectSizeSummary()).thenReturn(s3ObjectSizeSummary);
when(s3ObjectPluginMetrics.getS3ObjectNoRecordsFound()).thenReturn(s3ObjectNoRecordsFound);

Expand Down Expand Up @@ -286,6 +299,7 @@ void parseS3Object_calls_Codec_parse_with_Consumer_that_adds_to_BufferAccumulato
when(s3Client.headObject(any(HeadObjectRequest.class))).thenReturn(headObjectResponse);
when(s3ObjectPluginMetrics.getS3ObjectEventsSummary()).thenReturn(s3ObjectEventsSummary);
when(s3ObjectPluginMetrics.getS3ObjectsSucceededCounter()).thenReturn(s3ObjectsSucceededCounter);
when(s3ObjectPluginMetrics.getS3ObjectNoRecordsFound()).thenReturn(s3ObjectsNoRecordFoundCounter);
when(s3ObjectPluginMetrics.getS3ObjectSizeSummary()).thenReturn(s3ObjectSizeSummary);
when(s3ObjectPluginMetrics.getS3ObjectNoRecordsFound()).thenReturn(s3ObjectNoRecordsFound);

Expand Down Expand Up @@ -319,6 +333,7 @@ void parseS3Object_calls_BufferAccumulator_flush_after_Codec_parse() throws Exce
when(s3Client.headObject(any(HeadObjectRequest.class))).thenReturn(headObjectResponse);
when(s3ObjectPluginMetrics.getS3ObjectEventsSummary()).thenReturn(s3ObjectEventsSummary);
when(s3ObjectPluginMetrics.getS3ObjectsSucceededCounter()).thenReturn(s3ObjectsSucceededCounter);
when(s3ObjectPluginMetrics.getS3ObjectNoRecordsFound()).thenReturn(s3ObjectsNoRecordFoundCounter);
when(s3ObjectPluginMetrics.getS3ObjectSizeSummary()).thenReturn(s3ObjectSizeSummary);
when(s3ObjectPluginMetrics.getS3ObjectNoRecordsFound()).thenReturn(s3ObjectNoRecordsFound);

Expand All @@ -341,6 +356,7 @@ void parseS3Object_increments_success_counter_after_parsing_S3_object() throws I
when(s3ObjectPluginMetrics.getS3ObjectReadTimer()).thenReturn(s3ObjectReadTimer);
when(s3ObjectPluginMetrics.getS3ObjectEventsSummary()).thenReturn(s3ObjectEventsSummary);
when(s3ObjectPluginMetrics.getS3ObjectsSucceededCounter()).thenReturn(s3ObjectsSucceededCounter);
when(s3ObjectPluginMetrics.getS3ObjectNoRecordsFound()).thenReturn(s3ObjectsNoRecordFoundCounter);
when(s3ObjectPluginMetrics.getS3ObjectSizeSummary()).thenReturn(s3ObjectSizeSummary);
when(s3ObjectPluginMetrics.getS3ObjectNoRecordsFound()).thenReturn(s3ObjectNoRecordsFound);

Expand Down Expand Up @@ -394,6 +410,7 @@ void parseS3Object_calls_HeadObject_after_Callable() throws Exception {
when(s3Client.headObject(any(HeadObjectRequest.class))).thenReturn(headObjectResponse);
when(s3ObjectPluginMetrics.getS3ObjectEventsSummary()).thenReturn(s3ObjectEventsSummary);
when(s3ObjectPluginMetrics.getS3ObjectsSucceededCounter()).thenReturn(s3ObjectsSucceededCounter);
when(s3ObjectPluginMetrics.getS3ObjectNoRecordsFound()).thenReturn(s3ObjectsNoRecordFoundCounter);
when(s3ObjectPluginMetrics.getS3ObjectSizeSummary()).thenReturn(s3ObjectSizeSummary);
when(s3ObjectPluginMetrics.getS3ObjectNoRecordsFound()).thenReturn(s3ObjectNoRecordsFound);

Expand All @@ -412,6 +429,7 @@ void parseS3Object_records_BufferAccumulator_getTotalWritten() throws IOExceptio

when(s3ObjectPluginMetrics.getS3ObjectEventsSummary()).thenReturn(s3ObjectEventsSummary);
when(s3ObjectPluginMetrics.getS3ObjectsSucceededCounter()).thenReturn(s3ObjectsSucceededCounter);
//when(s3ObjectPluginMetrics.getS3ObjectNoRecordsFound()).thenReturn(s3ObjectsNoRecordFoundCounter);
when(s3ObjectPluginMetrics.getS3ObjectSizeSummary()).thenReturn(s3ObjectSizeSummary);

final int totalWritten = new Random().nextInt(10_000) + 5_000;
Expand All @@ -431,6 +449,7 @@ void parseS3Object_records_S3_ObjectSize() throws IOException {
when(s3Client.headObject(any(HeadObjectRequest.class))).thenReturn(headObjectResponse);
when(s3ObjectPluginMetrics.getS3ObjectEventsSummary()).thenReturn(s3ObjectEventsSummary);
when(s3ObjectPluginMetrics.getS3ObjectsSucceededCounter()).thenReturn(s3ObjectsSucceededCounter);
when(s3ObjectPluginMetrics.getS3ObjectNoRecordsFound()).thenReturn(s3ObjectsNoRecordFoundCounter);
when(s3ObjectPluginMetrics.getS3ObjectSizeSummary()).thenReturn(s3ObjectSizeSummary);
when(s3ObjectPluginMetrics.getS3ObjectNoRecordsFound()).thenReturn(s3ObjectNoRecordsFound);

Expand All @@ -444,6 +463,7 @@ void parseS3Object_records_S3_Object_No_Records_Found() throws IOException {
when(s3Client.headObject(any(HeadObjectRequest.class))).thenReturn(headObjectResponse);
when(s3ObjectPluginMetrics.getS3ObjectEventsSummary()).thenReturn(s3ObjectEventsSummary);
when(s3ObjectPluginMetrics.getS3ObjectsSucceededCounter()).thenReturn(s3ObjectsSucceededCounter);
when(s3ObjectPluginMetrics.getS3ObjectNoRecordsFound()).thenReturn(s3ObjectsNoRecordFoundCounter);
when(s3ObjectPluginMetrics.getS3ObjectSizeSummary()).thenReturn(s3ObjectSizeSummary);
when(s3ObjectPluginMetrics.getS3ObjectNoRecordsFound()).thenReturn(s3ObjectNoRecordsFound);

Expand All @@ -470,6 +490,7 @@ void parseS3Object_records_input_file_bytes_read() throws IOException {
when(s3ObjectPluginMetrics.getS3ObjectEventsSummary()).thenReturn(s3ObjectEventsSummary);
when(s3ObjectPluginMetrics.getS3ObjectSizeProcessedSummary()).thenReturn(s3ObjectSizeProcessedSummary);
when(s3ObjectPluginMetrics.getS3ObjectsSucceededCounter()).thenReturn(s3ObjectsSucceededCounter);
when(s3ObjectPluginMetrics.getS3ObjectNoRecordsFound()).thenReturn(s3ObjectsNoRecordFoundCounter);
when(s3ObjectPluginMetrics.getS3ObjectSizeSummary()).thenReturn(s3ObjectSizeSummary);
when(s3ObjectPluginMetrics.getS3ObjectNoRecordsFound()).thenReturn(s3ObjectNoRecordsFound);

Expand Down

0 comments on commit 30b46c6

Please sign in to comment.