Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change s3 scan and opensearch to only save state every 5 minutes, fix… #3581

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,7 @@ public static boolean isValidFormatExpressions(final String format, final Expres
if (Objects.isNull(expressionEvaluator)) {
return false;
}

int fromIndex = 0;
int position = 0;
while ((position = format.indexOf("${", fromIndex)) != -1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@

package org.opensearch.dataprepper.plugins.source.dynamodb.export;

import io.micrometer.core.instrument.Counter;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.MockedConstruction;
import org.mockito.MockedStatic;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.buffer.common.BufferAccumulator;
Expand All @@ -18,8 +18,7 @@
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition;
import org.opensearch.dataprepper.plugins.source.dynamodb.converter.ExportRecordConverter;
import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.DataFilePartition;
import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.state.DataFileProgressState;
import org.opensearch.dataprepper.plugins.source.dynamodb.model.TableInfo;
Expand All @@ -41,14 +40,12 @@
import java.util.zip.GZIPOutputStream;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockConstruction;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.opensearch.dataprepper.plugins.source.dynamodb.export.DataFileLoader.BUFFER_TIMEOUT;
Expand All @@ -57,9 +54,6 @@
@ExtendWith(MockitoExtension.class)
class DataFileLoaderTest {

@Mock
private EnhancedSourceCoordinator coordinator;

@Mock
private S3Client s3Client;

Expand All @@ -73,12 +67,13 @@ class DataFileLoaderTest {
private BufferAccumulator<Record<Event>> bufferAccumulator;

@Mock
private Counter testCounter;
private ExportRecordConverter exportRecordConverter;

@Mock
private DataFileCheckpointer checkpointer;

private S3ObjectReader objectReader;

private DataFileCheckpointer checkpointer;
private S3ObjectReader objectReader;

private DataFilePartition dataFilePartition;

Expand All @@ -93,7 +88,6 @@ class DataFileLoaderTest {

private final String manifestKey = UUID.randomUUID().toString();
private final String bucketName = UUID.randomUUID().toString();
private final String prefix = UUID.randomUUID().toString();

private final String exportArn = tableArn + "/export/01693291918297-bfeccbea";

Expand All @@ -102,7 +96,7 @@ class DataFileLoaderTest {
private final int total = random.nextInt(10);

@BeforeEach
void setup() throws Exception {
void setup() {

DataFileProgressState state = new DataFileProgressState();
state.setLoaded(0);
Expand All @@ -120,18 +114,6 @@ void setup() throws Exception {

when(s3Client.getObject(any(GetObjectRequest.class))).thenReturn(generateGzipInputStream(total));
objectReader = new S3ObjectReader(s3Client);

lenient().when(coordinator.createPartition(any(EnhancedSourcePartition.class))).thenReturn(true);
lenient().doNothing().when(coordinator).completePartition(any(EnhancedSourcePartition.class));
lenient().doNothing().when(coordinator).saveProgressStateForPartition(any(EnhancedSourcePartition.class), eq(null));
lenient().doNothing().when(coordinator).giveUpPartition(any(EnhancedSourcePartition.class));

doNothing().when(bufferAccumulator).add(any(Record.class));
doNothing().when(bufferAccumulator).flush();
checkpointer = new DataFileCheckpointer(coordinator, dataFilePartition);

given(pluginMetrics.counter(anyString())).willReturn(testCounter);

}

private ResponseInputStream<GetObjectResponse> generateGzipInputStream(int numberOfRecords) {
Expand Down Expand Up @@ -166,10 +148,13 @@ private ResponseInputStream<GetObjectResponse> generateGzipInputStream(int numbe
}

@Test
void test_run_loadFile_correctly() throws Exception {
void test_run_loadFile_correctly() {
DataFileLoader loader;
try (
final MockedStatic<BufferAccumulator> bufferAccumulatorMockedStatic = mockStatic(BufferAccumulator.class)) {
final MockedStatic<BufferAccumulator> bufferAccumulatorMockedStatic = mockStatic(BufferAccumulator.class);
final MockedConstruction<ExportRecordConverter> recordConverterMockedConstruction = mockConstruction(ExportRecordConverter.class, (mock, context) -> {
exportRecordConverter = mock;
})) {
bufferAccumulatorMockedStatic.when(() -> BufferAccumulator.create(buffer, DEFAULT_BUFFER_BATCH_SIZE, BUFFER_TIMEOUT)).thenReturn(bufferAccumulator);
loader = DataFileLoader.builder(objectReader, pluginMetrics, buffer)
.bucketName(bucketName)
Expand All @@ -184,23 +169,24 @@ void test_run_loadFile_correctly() throws Exception {
// Should call s3 getObject
verify(s3Client).getObject(any(GetObjectRequest.class));

// Should write to buffer
verify(bufferAccumulator, times(total)).add(any(Record.class));
verify(bufferAccumulator).flush();
verify(exportRecordConverter).writeToBuffer(eq(null), anyList());

// Should do one last checkpoint when done.
verify(coordinator).saveProgressStateForPartition(any(DataFilePartition.class), eq(null));
Comment on lines -191 to -192
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this removed ? Is there a way to validate it ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The coordinator class is not use in the DataFileLoader. This check was here because we weren't mocking the checkpointer class. Now we are mocking it so the behavior is tested in the Checkpointer unit tests.

verify(checkpointer).checkpoint(total);
verify(checkpointer, never()).updateDatafileForAcknowledgmentWait(any(Duration.class));
}

@Test
void run_loadFile_with_acknowledgments_processes_correctly() throws Exception {
void run_loadFile_with_acknowledgments_processes_correctly() {

final AcknowledgementSet acknowledgementSet = mock(AcknowledgementSet.class);
final Duration acknowledgmentTimeout = Duration.ofSeconds(30);

DataFileLoader loader;
try (
final MockedStatic<BufferAccumulator> bufferAccumulatorMockedStatic = mockStatic(BufferAccumulator.class)) {
final MockedStatic<BufferAccumulator> bufferAccumulatorMockedStatic = mockStatic(BufferAccumulator.class);
final MockedConstruction<ExportRecordConverter> recordConverterMockedConstruction = mockConstruction(ExportRecordConverter.class, (mock, context) -> {
exportRecordConverter = mock;
})) {
bufferAccumulatorMockedStatic.when(() -> BufferAccumulator.create(buffer, DEFAULT_BUFFER_BATCH_SIZE, BUFFER_TIMEOUT)).thenReturn(bufferAccumulator);
loader = DataFileLoader.builder(objectReader, pluginMetrics, buffer)
.bucketName(bucketName)
Expand All @@ -217,12 +203,11 @@ void run_loadFile_with_acknowledgments_processes_correctly() throws Exception {
// Should call s3 getObject
verify(s3Client).getObject(any(GetObjectRequest.class));

// Should write to buffer
verify(bufferAccumulator, times(total)).add(any(Record.class));
verify(bufferAccumulator).flush();
verify(exportRecordConverter).writeToBuffer(eq(acknowledgementSet), anyList());

verify(checkpointer).checkpoint(total);
verify(checkpointer).updateDatafileForAcknowledgmentWait(acknowledgmentTimeout);

// Should do one last checkpoint when done.
verify(coordinator).saveProgressStateForPartition(any(DataFilePartition.class), eq(null));

verify(acknowledgementSet).complete();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.Optional;

import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.BACKOFF_ON_EXCEPTION;
import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.DEFAULT_CHECKPOINT_INTERVAL_MILLS;
import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.calculateExponentialBackoffAndJitter;
import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.completeIndexPartition;
import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.createAcknowledgmentSet;
Expand Down Expand Up @@ -128,6 +129,8 @@ public void run() {
private void processIndex(final SourcePartition<OpenSearchIndexProgressState> openSearchIndexPartition,
final AcknowledgementSet acknowledgementSet) {
final String indexName = openSearchIndexPartition.getPartitionKey();
long lastCheckpointTime = System.currentTimeMillis();

LOG.info("Started processing for index: '{}'", indexName);

Optional<OpenSearchIndexProgressState> openSearchIndexProgressStateOptional = openSearchIndexPartition.getPartitionState();
Expand Down Expand Up @@ -165,7 +168,12 @@ private void processIndex(final SourcePartition<OpenSearchIndexProgressState> op
});

openSearchIndexProgressState.setSearchAfter(searchWithSearchAfterResults.getNextSearchAfter());
sourceCoordinator.saveProgressStateForPartition(indexName, openSearchIndexProgressState);

if (System.currentTimeMillis() - lastCheckpointTime > DEFAULT_CHECKPOINT_INTERVAL_MILLS) {
LOG.debug("Renew ownership of index {}", indexName);
sourceCoordinator.saveProgressStateForPartition(indexName, openSearchIndexProgressState);
lastCheckpointTime = System.currentTimeMillis();
}
} while (searchWithSearchAfterResults.getDocuments().size() == searchConfiguration.getBatchSize());

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.Optional;

import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.BACKOFF_ON_EXCEPTION;
import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.DEFAULT_CHECKPOINT_INTERVAL_MILLS;
import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.calculateExponentialBackoffAndJitter;
import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.completeIndexPartition;
import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.createAcknowledgmentSet;
Expand Down Expand Up @@ -150,6 +151,7 @@ public void run() {
private void processIndex(final SourcePartition<OpenSearchIndexProgressState> openSearchIndexPartition,
final AcknowledgementSet acknowledgementSet) {
final String indexName = openSearchIndexPartition.getPartitionKey();
long lastCheckpointTime = System.currentTimeMillis();

LOG.info("Starting processing for index: '{}'", indexName);
Optional<OpenSearchIndexProgressState> openSearchIndexProgressStateOptional = openSearchIndexPartition.getPartitionState();
Expand Down Expand Up @@ -203,7 +205,12 @@ private void processIndex(final SourcePartition<OpenSearchIndexProgressState> op

openSearchIndexProgressState.setSearchAfter(searchWithSearchAfterResults.getNextSearchAfter());
openSearchIndexProgressState.setKeepAlive(Duration.ofMillis(openSearchIndexProgressState.getKeepAlive()).plus(EXTEND_KEEP_ALIVE_DURATION).toMillis());
sourceCoordinator.saveProgressStateForPartition(indexName, openSearchIndexProgressState);

if (System.currentTimeMillis() - lastCheckpointTime > DEFAULT_CHECKPOINT_INTERVAL_MILLS) {
LOG.debug("Renew ownership of index {}", indexName);
sourceCoordinator.saveProgressStateForPartition(indexName, openSearchIndexProgressState);
lastCheckpointTime = System.currentTimeMillis();
}
} while (searchWithSearchAfterResults.getDocuments().size() == searchConfiguration.getBatchSize());

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.Optional;

import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.BACKOFF_ON_EXCEPTION;
import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.DEFAULT_CHECKPOINT_INTERVAL_MILLS;
import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.calculateExponentialBackoffAndJitter;
import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.completeIndexPartition;
import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.createAcknowledgmentSet;
Expand Down Expand Up @@ -145,6 +146,8 @@ public void run() {
private void processIndex(final SourcePartition<OpenSearchIndexProgressState> openSearchIndexPartition,
final AcknowledgementSet acknowledgementSet) {
final String indexName = openSearchIndexPartition.getPartitionKey();
long lastCheckpointTime = System.currentTimeMillis();

LOG.info("Started processing for index: '{}'", indexName);

final Integer batchSize = openSearchSourceConfiguration.getSearchConfiguration().getBatchSize();
Expand All @@ -168,7 +171,12 @@ private void processIndex(final SourcePartition<OpenSearchIndexProgressState> op
.build());

writeDocumentsToBuffer(searchScrollResponse.getDocuments(), acknowledgementSet);
sourceCoordinator.saveProgressStateForPartition(indexName, null);

if (System.currentTimeMillis() - lastCheckpointTime > DEFAULT_CHECKPOINT_INTERVAL_MILLS) {
LOG.debug("Renew ownership of index {}", indexName);
sourceCoordinator.saveProgressStateForPartition(indexName, null);
lastCheckpointTime = System.currentTimeMillis();
}
} catch (final Exception e) {
deleteScroll(createScrollResponse.getScrollId());
throw e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ public class WorkerCommonUtils {

static final Duration BACKOFF_ON_EXCEPTION = Duration.ofSeconds(60);

static final Duration ACKNOWLEDGEMENT_SET_TIMEOUT = Duration.ofHours(2);
static final long DEFAULT_CHECKPOINT_INTERVAL_MILLS = 5 * 60_000;
static final Duration ACKNOWLEDGEMENT_SET_TIMEOUT = Duration.ofHours(1);
static final Duration STARTING_BACKOFF = Duration.ofMillis(500);
static final Duration MAX_BACKOFF = Duration.ofSeconds(60);
static final int BACKOFF_RATE = 2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ void run_with_getNextPartition_with_non_empty_partition_processes_and_closes_tha
assertThat(executorService.awaitTermination(100, TimeUnit.MILLISECONDS), equalTo(true));

verify(searchAccessor, times(2)).searchWithoutSearchContext(any(NoSearchContextSearchRequest.class));
verify(sourceCoordinator, times(2)).saveProgressStateForPartition(eq(partitionKey), any(OpenSearchIndexProgressState.class));
verify(sourceCoordinator, times(0)).saveProgressStateForPartition(eq(partitionKey), any(OpenSearchIndexProgressState.class));

final List<NoSearchContextSearchRequest> noSearchContextSearchRequests = searchRequestArgumentCaptor.getAllValues();
assertThat(noSearchContextSearchRequests.size(), equalTo(2));
Expand Down Expand Up @@ -283,7 +283,7 @@ void run_with_getNextPartition_with_acknowledgments_processes_and_closes_that_pa
assertThat(executorService.awaitTermination(100, TimeUnit.MILLISECONDS), equalTo(true));

verify(searchAccessor, times(2)).searchWithoutSearchContext(any(NoSearchContextSearchRequest.class));
verify(sourceCoordinator, times(2)).saveProgressStateForPartition(eq(partitionKey), any(OpenSearchIndexProgressState.class));
verify(sourceCoordinator, times(0)).saveProgressStateForPartition(eq(partitionKey), any(OpenSearchIndexProgressState.class));

final List<NoSearchContextSearchRequest> noSearchContextSearchRequests = searchRequestArgumentCaptor.getAllValues();
assertThat(noSearchContextSearchRequests.size(), equalTo(2));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ void run_with_getNextPartition_with_non_empty_partition_creates_and_deletes_pit_
assertThat(createPointInTimeRequest.getKeepAlive(), equalTo(STARTING_KEEP_ALIVE));

verify(searchAccessor, times(2)).searchWithPit(any(SearchPointInTimeRequest.class));
verify(sourceCoordinator, times(2)).saveProgressStateForPartition(eq(partitionKey), any(OpenSearchIndexProgressState.class));
verify(sourceCoordinator, times(0)).saveProgressStateForPartition(eq(partitionKey), any(OpenSearchIndexProgressState.class));

final List<SearchPointInTimeRequest> searchPointInTimeRequestList = searchPointInTimeRequestArgumentCaptor.getAllValues();
assertThat(searchPointInTimeRequestList.size(), equalTo(2));
Expand Down Expand Up @@ -292,7 +292,7 @@ void run_with_acknowledgments_enabled_creates_and_deletes_pit_and_closes_that_pa
assertThat(createPointInTimeRequest.getKeepAlive(), equalTo(STARTING_KEEP_ALIVE));

verify(searchAccessor, times(2)).searchWithPit(any(SearchPointInTimeRequest.class));
verify(sourceCoordinator, times(2)).saveProgressStateForPartition(eq(partitionKey), any(OpenSearchIndexProgressState.class));
verify(sourceCoordinator, times(0)).saveProgressStateForPartition(eq(partitionKey), any(OpenSearchIndexProgressState.class));

final List<SearchPointInTimeRequest> searchPointInTimeRequestList = searchPointInTimeRequestArgumentCaptor.getAllValues();
assertThat(searchPointInTimeRequestList.size(), equalTo(2));
Expand Down Expand Up @@ -378,7 +378,7 @@ void run_with_getNextPartition_with_valid_existing_point_in_time_does_not_create

verify(searchAccessor, never()).createPit(any(CreatePointInTimeRequest.class));
verify(searchAccessor, times(2)).searchWithPit(any(SearchPointInTimeRequest.class));
verify(sourceCoordinator, times(2)).saveProgressStateForPartition(eq(partitionKey), eq(openSearchIndexProgressState));
verify(sourceCoordinator, times(0)).saveProgressStateForPartition(eq(partitionKey), eq(openSearchIndexProgressState));
verify(sourceCoordinator, times(0)).updatePartitionForAcknowledgmentWait(anyString(), any(Duration.class));

verify(documentsProcessedCounter, times(3)).increment();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ void run_with_getNextPartition_with_non_empty_partition_creates_and_deletes_scro
assertThat(createScrollRequest.getScrollTime(), equalTo(SCROLL_TIME_PER_BATCH));

verify(searchAccessor, times(2)).searchWithScroll(any(SearchScrollRequest.class));
verify(sourceCoordinator, times(2)).saveProgressStateForPartition(eq(partitionKey), eq(null));
verify(sourceCoordinator, times(0)).saveProgressStateForPartition(eq(partitionKey), eq(null));

final List<SearchScrollRequest> searchScrollRequests = searchScrollRequestArgumentCaptor.getAllValues();
assertThat(searchScrollRequests.size(), equalTo(2));
Expand Down Expand Up @@ -286,7 +286,7 @@ void run_with_getNextPartition_with_acknowledgments_creates_and_deletes_scroll_a
assertThat(createScrollRequest.getScrollTime(), equalTo(SCROLL_TIME_PER_BATCH));

verify(searchAccessor, times(2)).searchWithScroll(any(SearchScrollRequest.class));
verify(sourceCoordinator, times(2)).saveProgressStateForPartition(eq(partitionKey), eq(null));
verify(sourceCoordinator, times(0)).saveProgressStateForPartition(eq(partitionKey), eq(null));

final List<SearchScrollRequest> searchScrollRequests = searchScrollRequestArgumentCaptor.getAllValues();
assertThat(searchScrollRequests.size(), equalTo(2));
Expand Down
Loading
Loading