diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java index 846c4c3011..1f36e01e25 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java @@ -322,6 +322,7 @@ public static boolean isValidFormatExpressions(final String format, final Expres if (Objects.isNull(expressionEvaluator)) { return false; } + int fromIndex = 0; int position = 0; while ((position = format.indexOf("${", fromIndex)) != -1) { diff --git a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileLoaderTest.java b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileLoaderTest.java index 7c5c4bee56..a92b80645b 100644 --- a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileLoaderTest.java +++ b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileLoaderTest.java @@ -5,11 +5,11 @@ package org.opensearch.dataprepper.plugins.source.dynamodb.export; -import io.micrometer.core.instrument.Counter; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; +import org.mockito.MockedConstruction; import org.mockito.MockedStatic; import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.dataprepper.buffer.common.BufferAccumulator; @@ -18,8 +18,7 @@ import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.record.Record; -import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; -import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; +import org.opensearch.dataprepper.plugins.source.dynamodb.converter.ExportRecordConverter; import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.DataFilePartition; import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.state.DataFileProgressState; import org.opensearch.dataprepper.plugins.source.dynamodb.model.TableInfo; @@ -41,14 +40,12 @@ import java.util.zip.GZIPOutputStream; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.BDDMockito.given; -import static org.mockito.Mockito.doNothing; -import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockConstruction; import static org.mockito.Mockito.mockStatic; -import static org.mockito.Mockito.times; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.opensearch.dataprepper.plugins.source.dynamodb.export.DataFileLoader.BUFFER_TIMEOUT; @@ -57,9 +54,6 @@ @ExtendWith(MockitoExtension.class) class DataFileLoaderTest { - @Mock - private EnhancedSourceCoordinator coordinator; - @Mock private S3Client s3Client; @@ -73,12 +67,13 @@ class DataFileLoaderTest { private BufferAccumulator> bufferAccumulator; @Mock - private Counter testCounter; + private ExportRecordConverter exportRecordConverter; + @Mock + private DataFileCheckpointer checkpointer; - private S3ObjectReader objectReader; - private DataFileCheckpointer checkpointer; + private S3ObjectReader objectReader; private DataFilePartition dataFilePartition; @@ -93,7 +88,6 @@ class DataFileLoaderTest { private final String manifestKey = UUID.randomUUID().toString(); private final String bucketName = UUID.randomUUID().toString(); - private final String prefix = UUID.randomUUID().toString(); private final String exportArn = tableArn + "/export/01693291918297-bfeccbea"; @@ -102,7 +96,7 @@ class DataFileLoaderTest { private final int total = random.nextInt(10); @BeforeEach - void setup() throws Exception { + void setup() { DataFileProgressState state = new DataFileProgressState(); state.setLoaded(0); @@ -120,18 +114,6 @@ void setup() throws Exception { when(s3Client.getObject(any(GetObjectRequest.class))).thenReturn(generateGzipInputStream(total)); objectReader = new S3ObjectReader(s3Client); - - lenient().when(coordinator.createPartition(any(EnhancedSourcePartition.class))).thenReturn(true); - lenient().doNothing().when(coordinator).completePartition(any(EnhancedSourcePartition.class)); - lenient().doNothing().when(coordinator).saveProgressStateForPartition(any(EnhancedSourcePartition.class), eq(null)); - lenient().doNothing().when(coordinator).giveUpPartition(any(EnhancedSourcePartition.class)); - - doNothing().when(bufferAccumulator).add(any(Record.class)); - doNothing().when(bufferAccumulator).flush(); - checkpointer = new DataFileCheckpointer(coordinator, dataFilePartition); - - given(pluginMetrics.counter(anyString())).willReturn(testCounter); - } private ResponseInputStream generateGzipInputStream(int numberOfRecords) { @@ -166,10 +148,13 @@ private ResponseInputStream generateGzipInputStream(int numbe } @Test - void test_run_loadFile_correctly() throws Exception { + void test_run_loadFile_correctly() { DataFileLoader loader; try ( - final MockedStatic bufferAccumulatorMockedStatic = mockStatic(BufferAccumulator.class)) { + final MockedStatic bufferAccumulatorMockedStatic = mockStatic(BufferAccumulator.class); + final MockedConstruction recordConverterMockedConstruction = mockConstruction(ExportRecordConverter.class, (mock, context) -> { + exportRecordConverter = mock; + })) { bufferAccumulatorMockedStatic.when(() -> BufferAccumulator.create(buffer, DEFAULT_BUFFER_BATCH_SIZE, BUFFER_TIMEOUT)).thenReturn(bufferAccumulator); loader = DataFileLoader.builder(objectReader, pluginMetrics, buffer) .bucketName(bucketName) @@ -184,23 +169,24 @@ void test_run_loadFile_correctly() throws Exception { // Should call s3 getObject verify(s3Client).getObject(any(GetObjectRequest.class)); - // Should write to buffer - verify(bufferAccumulator, times(total)).add(any(Record.class)); - verify(bufferAccumulator).flush(); + verify(exportRecordConverter).writeToBuffer(eq(null), anyList()); - // Should do one last checkpoint when done. - verify(coordinator).saveProgressStateForPartition(any(DataFilePartition.class), eq(null)); + verify(checkpointer).checkpoint(total); + verify(checkpointer, never()).updateDatafileForAcknowledgmentWait(any(Duration.class)); } @Test - void run_loadFile_with_acknowledgments_processes_correctly() throws Exception { + void run_loadFile_with_acknowledgments_processes_correctly() { final AcknowledgementSet acknowledgementSet = mock(AcknowledgementSet.class); final Duration acknowledgmentTimeout = Duration.ofSeconds(30); DataFileLoader loader; try ( - final MockedStatic bufferAccumulatorMockedStatic = mockStatic(BufferAccumulator.class)) { + final MockedStatic bufferAccumulatorMockedStatic = mockStatic(BufferAccumulator.class); + final MockedConstruction recordConverterMockedConstruction = mockConstruction(ExportRecordConverter.class, (mock, context) -> { + exportRecordConverter = mock; + })) { bufferAccumulatorMockedStatic.when(() -> BufferAccumulator.create(buffer, DEFAULT_BUFFER_BATCH_SIZE, BUFFER_TIMEOUT)).thenReturn(bufferAccumulator); loader = DataFileLoader.builder(objectReader, pluginMetrics, buffer) .bucketName(bucketName) @@ -217,12 +203,11 @@ void run_loadFile_with_acknowledgments_processes_correctly() throws Exception { // Should call s3 getObject verify(s3Client).getObject(any(GetObjectRequest.class)); - // Should write to buffer - verify(bufferAccumulator, times(total)).add(any(Record.class)); - verify(bufferAccumulator).flush(); + verify(exportRecordConverter).writeToBuffer(eq(acknowledgementSet), anyList()); + + verify(checkpointer).checkpoint(total); + verify(checkpointer).updateDatafileForAcknowledgmentWait(acknowledgmentTimeout); - // Should do one last checkpoint when done. - verify(coordinator).saveProgressStateForPartition(any(DataFilePartition.class), eq(null)); verify(acknowledgementSet).complete(); } diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/NoSearchContextWorker.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/NoSearchContextWorker.java index 121be0a0b5..b46f50ab3e 100644 --- a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/NoSearchContextWorker.java +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/NoSearchContextWorker.java @@ -31,6 +31,7 @@ import java.util.Optional; import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.BACKOFF_ON_EXCEPTION; +import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.DEFAULT_CHECKPOINT_INTERVAL_MILLS; import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.calculateExponentialBackoffAndJitter; import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.completeIndexPartition; import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.createAcknowledgmentSet; @@ -128,6 +129,8 @@ public void run() { private void processIndex(final SourcePartition openSearchIndexPartition, final AcknowledgementSet acknowledgementSet) { final String indexName = openSearchIndexPartition.getPartitionKey(); + long lastCheckpointTime = System.currentTimeMillis(); + LOG.info("Started processing for index: '{}'", indexName); Optional openSearchIndexProgressStateOptional = openSearchIndexPartition.getPartitionState(); @@ -165,7 +168,12 @@ private void processIndex(final SourcePartition op }); openSearchIndexProgressState.setSearchAfter(searchWithSearchAfterResults.getNextSearchAfter()); - sourceCoordinator.saveProgressStateForPartition(indexName, openSearchIndexProgressState); + + if (System.currentTimeMillis() - lastCheckpointTime > DEFAULT_CHECKPOINT_INTERVAL_MILLS) { + LOG.debug("Renew ownership of index {}", indexName); + sourceCoordinator.saveProgressStateForPartition(indexName, openSearchIndexProgressState); + lastCheckpointTime = System.currentTimeMillis(); + } } while (searchWithSearchAfterResults.getDocuments().size() == searchConfiguration.getBatchSize()); try { diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorker.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorker.java index ee49aee262..9050b6fd87 100644 --- a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorker.java +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorker.java @@ -35,6 +35,7 @@ import java.util.Optional; import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.BACKOFF_ON_EXCEPTION; +import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.DEFAULT_CHECKPOINT_INTERVAL_MILLS; import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.calculateExponentialBackoffAndJitter; import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.completeIndexPartition; import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.createAcknowledgmentSet; @@ -150,6 +151,7 @@ public void run() { private void processIndex(final SourcePartition openSearchIndexPartition, final AcknowledgementSet acknowledgementSet) { final String indexName = openSearchIndexPartition.getPartitionKey(); + long lastCheckpointTime = System.currentTimeMillis(); LOG.info("Starting processing for index: '{}'", indexName); Optional openSearchIndexProgressStateOptional = openSearchIndexPartition.getPartitionState(); @@ -203,7 +205,12 @@ private void processIndex(final SourcePartition op openSearchIndexProgressState.setSearchAfter(searchWithSearchAfterResults.getNextSearchAfter()); openSearchIndexProgressState.setKeepAlive(Duration.ofMillis(openSearchIndexProgressState.getKeepAlive()).plus(EXTEND_KEEP_ALIVE_DURATION).toMillis()); - sourceCoordinator.saveProgressStateForPartition(indexName, openSearchIndexProgressState); + + if (System.currentTimeMillis() - lastCheckpointTime > DEFAULT_CHECKPOINT_INTERVAL_MILLS) { + LOG.debug("Renew ownership of index {}", indexName); + sourceCoordinator.saveProgressStateForPartition(indexName, openSearchIndexProgressState); + lastCheckpointTime = System.currentTimeMillis(); + } } while (searchWithSearchAfterResults.getDocuments().size() == searchConfiguration.getBatchSize()); try { diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/ScrollWorker.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/ScrollWorker.java index ce34521205..be12d40dc6 100644 --- a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/ScrollWorker.java +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/ScrollWorker.java @@ -34,6 +34,7 @@ import java.util.Optional; import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.BACKOFF_ON_EXCEPTION; +import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.DEFAULT_CHECKPOINT_INTERVAL_MILLS; import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.calculateExponentialBackoffAndJitter; import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.completeIndexPartition; import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.createAcknowledgmentSet; @@ -145,6 +146,8 @@ public void run() { private void processIndex(final SourcePartition openSearchIndexPartition, final AcknowledgementSet acknowledgementSet) { final String indexName = openSearchIndexPartition.getPartitionKey(); + long lastCheckpointTime = System.currentTimeMillis(); + LOG.info("Started processing for index: '{}'", indexName); final Integer batchSize = openSearchSourceConfiguration.getSearchConfiguration().getBatchSize(); @@ -168,7 +171,12 @@ private void processIndex(final SourcePartition op .build()); writeDocumentsToBuffer(searchScrollResponse.getDocuments(), acknowledgementSet); - sourceCoordinator.saveProgressStateForPartition(indexName, null); + + if (System.currentTimeMillis() - lastCheckpointTime > DEFAULT_CHECKPOINT_INTERVAL_MILLS) { + LOG.debug("Renew ownership of index {}", indexName); + sourceCoordinator.saveProgressStateForPartition(indexName, null); + lastCheckpointTime = System.currentTimeMillis(); + } } catch (final Exception e) { deleteScroll(createScrollResponse.getScrollId()); throw e; diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/WorkerCommonUtils.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/WorkerCommonUtils.java index 2bde6c0370..7c5a132017 100644 --- a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/WorkerCommonUtils.java +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/WorkerCommonUtils.java @@ -27,7 +27,8 @@ public class WorkerCommonUtils { static final Duration BACKOFF_ON_EXCEPTION = Duration.ofSeconds(60); - static final Duration ACKNOWLEDGEMENT_SET_TIMEOUT = Duration.ofHours(2); + static final long DEFAULT_CHECKPOINT_INTERVAL_MILLS = 5 * 60_000; + static final Duration ACKNOWLEDGEMENT_SET_TIMEOUT = Duration.ofHours(1); static final Duration STARTING_BACKOFF = Duration.ofMillis(500); static final Duration MAX_BACKOFF = Duration.ofSeconds(60); static final int BACKOFF_RATE = 2; diff --git a/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/NoSearchContextWorkerTest.java b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/NoSearchContextWorkerTest.java index 2397aa87b0..e57d40f266 100644 --- a/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/NoSearchContextWorkerTest.java +++ b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/NoSearchContextWorkerTest.java @@ -206,7 +206,7 @@ void run_with_getNextPartition_with_non_empty_partition_processes_and_closes_tha assertThat(executorService.awaitTermination(100, TimeUnit.MILLISECONDS), equalTo(true)); verify(searchAccessor, times(2)).searchWithoutSearchContext(any(NoSearchContextSearchRequest.class)); - verify(sourceCoordinator, times(2)).saveProgressStateForPartition(eq(partitionKey), any(OpenSearchIndexProgressState.class)); + verify(sourceCoordinator, times(0)).saveProgressStateForPartition(eq(partitionKey), any(OpenSearchIndexProgressState.class)); final List noSearchContextSearchRequests = searchRequestArgumentCaptor.getAllValues(); assertThat(noSearchContextSearchRequests.size(), equalTo(2)); @@ -283,7 +283,7 @@ void run_with_getNextPartition_with_acknowledgments_processes_and_closes_that_pa assertThat(executorService.awaitTermination(100, TimeUnit.MILLISECONDS), equalTo(true)); verify(searchAccessor, times(2)).searchWithoutSearchContext(any(NoSearchContextSearchRequest.class)); - verify(sourceCoordinator, times(2)).saveProgressStateForPartition(eq(partitionKey), any(OpenSearchIndexProgressState.class)); + verify(sourceCoordinator, times(0)).saveProgressStateForPartition(eq(partitionKey), any(OpenSearchIndexProgressState.class)); final List noSearchContextSearchRequests = searchRequestArgumentCaptor.getAllValues(); assertThat(noSearchContextSearchRequests.size(), equalTo(2)); diff --git a/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorkerTest.java b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorkerTest.java index 7784f7ddff..a333925137 100644 --- a/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorkerTest.java +++ b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorkerTest.java @@ -191,7 +191,7 @@ void run_with_getNextPartition_with_non_empty_partition_creates_and_deletes_pit_ assertThat(createPointInTimeRequest.getKeepAlive(), equalTo(STARTING_KEEP_ALIVE)); verify(searchAccessor, times(2)).searchWithPit(any(SearchPointInTimeRequest.class)); - verify(sourceCoordinator, times(2)).saveProgressStateForPartition(eq(partitionKey), any(OpenSearchIndexProgressState.class)); + verify(sourceCoordinator, times(0)).saveProgressStateForPartition(eq(partitionKey), any(OpenSearchIndexProgressState.class)); final List searchPointInTimeRequestList = searchPointInTimeRequestArgumentCaptor.getAllValues(); assertThat(searchPointInTimeRequestList.size(), equalTo(2)); @@ -292,7 +292,7 @@ void run_with_acknowledgments_enabled_creates_and_deletes_pit_and_closes_that_pa assertThat(createPointInTimeRequest.getKeepAlive(), equalTo(STARTING_KEEP_ALIVE)); verify(searchAccessor, times(2)).searchWithPit(any(SearchPointInTimeRequest.class)); - verify(sourceCoordinator, times(2)).saveProgressStateForPartition(eq(partitionKey), any(OpenSearchIndexProgressState.class)); + verify(sourceCoordinator, times(0)).saveProgressStateForPartition(eq(partitionKey), any(OpenSearchIndexProgressState.class)); final List searchPointInTimeRequestList = searchPointInTimeRequestArgumentCaptor.getAllValues(); assertThat(searchPointInTimeRequestList.size(), equalTo(2)); @@ -378,7 +378,7 @@ void run_with_getNextPartition_with_valid_existing_point_in_time_does_not_create verify(searchAccessor, never()).createPit(any(CreatePointInTimeRequest.class)); verify(searchAccessor, times(2)).searchWithPit(any(SearchPointInTimeRequest.class)); - verify(sourceCoordinator, times(2)).saveProgressStateForPartition(eq(partitionKey), eq(openSearchIndexProgressState)); + verify(sourceCoordinator, times(0)).saveProgressStateForPartition(eq(partitionKey), eq(openSearchIndexProgressState)); verify(sourceCoordinator, times(0)).updatePartitionForAcknowledgmentWait(anyString(), any(Duration.class)); verify(documentsProcessedCounter, times(3)).increment(); diff --git a/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/ScrollWorkerTest.java b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/ScrollWorkerTest.java index 63f88c272c..ffd24b5972 100644 --- a/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/ScrollWorkerTest.java +++ b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/ScrollWorkerTest.java @@ -191,7 +191,7 @@ void run_with_getNextPartition_with_non_empty_partition_creates_and_deletes_scro assertThat(createScrollRequest.getScrollTime(), equalTo(SCROLL_TIME_PER_BATCH)); verify(searchAccessor, times(2)).searchWithScroll(any(SearchScrollRequest.class)); - verify(sourceCoordinator, times(2)).saveProgressStateForPartition(eq(partitionKey), eq(null)); + verify(sourceCoordinator, times(0)).saveProgressStateForPartition(eq(partitionKey), eq(null)); final List searchScrollRequests = searchScrollRequestArgumentCaptor.getAllValues(); assertThat(searchScrollRequests.size(), equalTo(2)); @@ -286,7 +286,7 @@ void run_with_getNextPartition_with_acknowledgments_creates_and_deletes_scroll_a assertThat(createScrollRequest.getScrollTime(), equalTo(SCROLL_TIME_PER_BATCH)); verify(searchAccessor, times(2)).searchWithScroll(any(SearchScrollRequest.class)); - verify(sourceCoordinator, times(2)).saveProgressStateForPartition(eq(partitionKey), eq(null)); + verify(sourceCoordinator, times(0)).saveProgressStateForPartition(eq(partitionKey), eq(null)); final List searchScrollRequests = searchScrollRequestArgumentCaptor.getAllValues(); assertThat(searchScrollRequests.size(), equalTo(2)); diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkRetryStrategy.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkRetryStrategy.java index 9f8f501fb6..793029f6ef 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkRetryStrategy.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkRetryStrategy.java @@ -259,7 +259,7 @@ private void handleFailures(final AccumulatingBulkRequest> actions, final Expres for (final Map actionMap: actions) { String action = (String)actionMap.get("type"); if (action != null) { - checkArgument((EnumUtils.isValidEnumIgnoreCase(OpenSearchBulkActions.class, action) || JacksonEvent.isValidFormatExpressions(action, expressionEvaluator)), "action must be one of the following: " + OpenSearchBulkActions.values()); + checkArgument((EnumUtils.isValidEnumIgnoreCase(OpenSearchBulkActions.class, action) || + (action.contains("${") && JacksonEvent.isValidFormatExpressions(action, expressionEvaluator))), "Invalid action \"" + action + "\". action must be one of the following: " + Arrays.stream(OpenSearchBulkActions.values()).collect(Collectors.toList())); } } this.actions = actions; diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3ObjectWorker.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3ObjectWorker.java index 024cb05903..0397183877 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3ObjectWorker.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3ObjectWorker.java @@ -22,6 +22,7 @@ import java.time.Duration; import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiConsumer; /** @@ -30,7 +31,8 @@ */ class S3ObjectWorker implements S3ObjectHandler { private static final Logger LOG = LoggerFactory.getLogger(S3ObjectWorker.class); - static final int RECORDS_TO_ACCUMULATE_TO_SAVE_STATE = 10_000; + private static final long DEFAULT_CHECKPOINT_INTERVAL_MILLS = 5 * 60_000; + private final S3Client s3Client; private final Buffer> buffer; @@ -83,6 +85,7 @@ private void doParseObject(final AcknowledgementSet acknowledgementSet, final long totalBytesRead; LOG.info("Read S3 object: {}", s3ObjectReference); + AtomicLong lastCheckpointTime = new AtomicLong(System.currentTimeMillis()); final S3InputFile inputFile = new S3InputFile(s3Client, s3ObjectReference, bucketOwnerProvider, s3ObjectPluginMetrics); @@ -104,10 +107,12 @@ private void doParseObject(final AcknowledgementSet acknowledgementSet, acknowledgementSet.add(record.getData()); } bufferAccumulator.add(record); - int recordsWrittenAfterLastSaveState = bufferAccumulator.getTotalWritten() - saveStateCounter.get() * RECORDS_TO_ACCUMULATE_TO_SAVE_STATE; - // Saving state to renew source coordination ownership for every 10,000 records, ownership time is 10 minutes - if (recordsWrittenAfterLastSaveState >= RECORDS_TO_ACCUMULATE_TO_SAVE_STATE && sourceCoordinator != null && partitionKey != null) { + + if (sourceCoordinator != null && partitionKey != null && + (System.currentTimeMillis() - lastCheckpointTime.get() > DEFAULT_CHECKPOINT_INTERVAL_MILLS)) { + LOG.debug("Renew partition ownership for the object {}", partitionKey); sourceCoordinator.saveProgressStateForPartition(partitionKey, null); + lastCheckpointTime.set(System.currentTimeMillis()); saveStateCounter.getAndIncrement(); } } catch (final Exception e) { diff --git a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ObjectWorkerTest.java b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ObjectWorkerTest.java index 2cbd5bbf47..eadc9a7b8b 100644 --- a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ObjectWorkerTest.java +++ b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ObjectWorkerTest.java @@ -59,10 +59,10 @@ import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; -import static org.opensearch.dataprepper.plugins.source.s3.S3ObjectWorker.RECORDS_TO_ACCUMULATE_TO_SAVE_STATE; @ExtendWith(MockitoExtension.class) class S3ObjectWorkerTest { @@ -306,14 +306,12 @@ void parseS3Object_calls_Codec_parse_with_Consumer_that_adds_to_BufferAccumulato final Record record = mock(Record.class); final Event event = mock(Event.class); when(record.getData()).thenReturn(event); - when(bufferAccumulator.getTotalWritten()).thenReturn(RECORDS_TO_ACCUMULATE_TO_SAVE_STATE + 1); - consumerUnderTest.accept(record); final InOrder inOrder = inOrder(eventConsumer, bufferAccumulator, sourceCoordinator); inOrder.verify(eventConsumer).accept(event, s3ObjectReference); inOrder.verify(bufferAccumulator).add(record); - inOrder.verify(sourceCoordinator).saveProgressStateForPartition(testPartitionKey, null); + inOrder.verify(sourceCoordinator, times(0)).saveProgressStateForPartition(testPartitionKey, null); } @Test