Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change s3 scan and opensearch to only save state every 5 minutes, fix… #3581

Conversation

graytaylor0
Copy link
Member

… bug where any action was valid in OpenSearch sink

Signed-off-by: Taylor Gray tylgry@amazon.com

Description

  • Changes the rate at which saveProgressState is called to the coordination store for partitions in s3 scan and the opensearch source. Before they were calling saveState every 10,000 Events, which was far too little. It is better to use a time based measurement

They were calling it too often, leading to wasted writes on the store. The default lease timeout is 10 minutes, so we have them call every 5 minutes.

Check List

  • New functionality includes testing.
  • New functionality has a documentation issue. Please link to it in this PR.
    • New functionality has javadoc added
  • Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

… bug where any action was valid in OpenSearch sink

Signed-off-by: Taylor Gray <tylgry@amazon.com>
@graytaylor0 graytaylor0 force-pushed the FixCheckpointRateForOSSourceAndS3Scan branch from 0bf0917 to 823da1e Compare November 3, 2023 05:53
if (sourceCoordinator != null && partitionKey != null &&
(System.currentTimeMillis() - lastCheckpointTime.get() > DEFAULT_CHECKPOINT_INTERVAL_MILLS)) {
LOG.debug("Renew partition ownership for the object {}", partitionKey);
lastCheckpointTime.set(System.currentTimeMillis());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be set after saveProgressState is called ? What happens in the saveProgressState api fails ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call will swap the order

Comment on lines -191 to -192
// Should do one last checkpoint when done.
verify(coordinator).saveProgressStateForPartition(any(DataFilePartition.class), eq(null));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this removed ? Is there a way to validate it ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The coordinator class is not use in the DataFileLoader. This check was here because we weren't mocking the checkpointer class. Now we are mocking it so the behavior is tested in the Checkpointer unit tests.

Signed-off-by: Taylor Gray <tylgry@amazon.com>
@graytaylor0 graytaylor0 merged commit fa69169 into opensearch-project:main Nov 5, 2023
63 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants