-
Notifications
You must be signed in to change notification settings - Fork 188
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Change s3 scan and opensearch to only save state every 5 minutes, fix… #3581
Change s3 scan and opensearch to only save state every 5 minutes, fix… #3581
Conversation
9c2658a
to
0bf0917
Compare
… bug where any action was valid in OpenSearch sink Signed-off-by: Taylor Gray <tylgry@amazon.com>
0bf0917
to
823da1e
Compare
if (sourceCoordinator != null && partitionKey != null && | ||
(System.currentTimeMillis() - lastCheckpointTime.get() > DEFAULT_CHECKPOINT_INTERVAL_MILLS)) { | ||
LOG.debug("Renew partition ownership for the object {}", partitionKey); | ||
lastCheckpointTime.set(System.currentTimeMillis()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be set after saveProgressState is called ? What happens in the saveProgressState api fails ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call will swap the order
// Should do one last checkpoint when done. | ||
verify(coordinator).saveProgressStateForPartition(any(DataFilePartition.class), eq(null)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this removed ? Is there a way to validate it ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The coordinator class is not use in the DataFileLoader
. This check was here because we weren't mocking the checkpointer
class. Now we are mocking it so the behavior is tested in the Checkpointer
unit tests.
Signed-off-by: Taylor Gray <tylgry@amazon.com>
… bug where any action was valid in OpenSearch sink
Signed-off-by: Taylor Gray tylgry@amazon.com
Description
saveProgressState
is called to the coordination store for partitions in s3 scan and the opensearch source. Before they were calling saveState every 10,000 Events, which was far too little. It is better to use a time based measurementThey were calling it too often, leading to wasted writes on the store. The default lease timeout is 10 minutes, so we have them call every 5 minutes.
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.