Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix a bug with event listener in RDS source #4962

Merged

Conversation

oeyh
Copy link
Collaborator

@oeyh oeyh commented Sep 19, 2024

Description

Fix a bug by deregistering event listener when binlog client is disconnected.

Issues Resolved

When receiving negative acknowledgment, RDS source plugin will disconnect binlog client and reconnect with latest checkpoint. When reconnected, however, the client cannot process events any more with this error:

java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@56c0293[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@33d53db2[Wrapped task = org.opensearch.dataprepper.plugins.source.rds.stream.BinlogEventListener$$Lambda$3144/0x000000080161d578@db0faf8]] rejected from java.util.concurrent.ThreadPoolExecutor@53e18682[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 12717915]

This is because the old event listener is still registered and accepting events but its executor pool has been shutdown.

Testing

Was able to reproduce the issue by setting ack always to negative and run an RDS source pipeline. Also verified the fix with the same setup.

Check List

  • New functionality includes testing.
  • New functionality has a documentation issue. Please link to it in this PR.
    • New functionality has javadoc added
  • Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Signed-off-by: Hai Yan <oeyh@amazon.com>
@@ -146,6 +146,7 @@ public void onEvent(com.github.shyiko.mysql.binlog.event.Event event) {
public void stopClient() {
try {
binaryLogClient.disconnect();
binaryLogClient.unregisterEventListener(this);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have a unit test for shutdown? Can you verify that this was called there? Also, we should verify in order.

InOrder inOrder = inOrder(binaryLogClient, binlogExecutorService);
inOrder.verify(binaryLogClient).disconnect();
inOrder.verify(binaryLogClient).disconnect());
inOrder.verify(binlogExecutorService).disconnect());

Copy link
Collaborator Author

@oeyh oeyh Sep 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Added.

Signed-off-by: Hai Yan <oeyh@amazon.com>
@dlvenable dlvenable merged commit 18307d3 into opensearch-project:main Sep 20, 2024
45 of 47 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants