Skip to content

Commit

Permalink
Fix a bug with event listener in RDS source (#4962)
Browse files Browse the repository at this point in the history
Unregister eventListener on stop

Signed-off-by: Hai Yan <oeyh@amazon.com>
  • Loading branch information
oeyh committed Sep 20, 2024
1 parent aaef847 commit 18307d3
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ public void onEvent(com.github.shyiko.mysql.binlog.event.Event event) {
public void stopClient() {
try {
binaryLogClient.disconnect();
binaryLogClient.unregisterEventListener(this);
binlogEventExecutorService.shutdownNow();
LOG.info("Binary log client disconnected.");
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.junit.jupiter.params.provider.EnumSource;
import org.mockito.Answers;
import org.mockito.ArgumentCaptor;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.MockedStatic;
import org.mockito.junit.jupiter.MockitoExtension;
Expand All @@ -26,6 +27,7 @@
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.source.rds.RdsSourceConfig;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand All @@ -34,6 +36,7 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -103,6 +106,16 @@ void test_given_TableMap_event_then_calls_correct_handler() {
verify(objectUnderTest).handleTableMapEvent(binlogEvent);
}

@Test
void test_stopClient() throws IOException {
objectUnderTest.stopClient();

InOrder inOrder = inOrder(binaryLogClient, eventListnerExecutorService);
inOrder.verify(binaryLogClient).disconnect();
inOrder.verify(binaryLogClient).unregisterEventListener(objectUnderTest);
inOrder.verify(eventListnerExecutorService).shutdownNow();
}

@ParameterizedTest
@EnumSource(names = {"WRITE_ROWS", "EXT_WRITE_ROWS"})
void test_given_WriteRows_event_then_calls_correct_handler(EventType eventType) {
Expand Down

0 comments on commit 18307d3

Please sign in to comment.