From 18307d39836061ace65f31ea41ea4bffb78c7501 Mon Sep 17 00:00:00 2001 From: Hai Yan <8153134+oeyh@users.noreply.github.com> Date: Fri, 20 Sep 2024 11:41:45 -0500 Subject: [PATCH] Fix a bug with event listener in RDS source (#4962) Unregister eventListener on stop Signed-off-by: Hai Yan --- .../source/rds/stream/BinlogEventListener.java | 1 + .../source/rds/stream/BinlogEventListenerTest.java | 13 +++++++++++++ 2 files changed, 14 insertions(+) diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogEventListener.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogEventListener.java index cd12711f27..fbe1aae36f 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogEventListener.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogEventListener.java @@ -146,6 +146,7 @@ public void onEvent(com.github.shyiko.mysql.binlog.event.Event event) { public void stopClient() { try { binaryLogClient.disconnect(); + binaryLogClient.unregisterEventListener(this); binlogEventExecutorService.shutdownNow(); LOG.info("Binary log client disconnected."); } catch (Exception e) { diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogEventListenerTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogEventListenerTest.java index 27f3fa9037..1312607821 100644 --- a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogEventListenerTest.java +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogEventListenerTest.java @@ -16,6 +16,7 @@ import org.junit.jupiter.params.provider.EnumSource; import org.mockito.Answers; import org.mockito.ArgumentCaptor; +import org.mockito.InOrder; import org.mockito.Mock; import org.mockito.MockedStatic; import org.mockito.junit.jupiter.MockitoExtension; @@ -26,6 +27,7 @@ import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.plugins.source.rds.RdsSourceConfig; +import java.io.IOException; import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -34,6 +36,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mockStatic; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; @@ -103,6 +106,16 @@ void test_given_TableMap_event_then_calls_correct_handler() { verify(objectUnderTest).handleTableMapEvent(binlogEvent); } + @Test + void test_stopClient() throws IOException { + objectUnderTest.stopClient(); + + InOrder inOrder = inOrder(binaryLogClient, eventListnerExecutorService); + inOrder.verify(binaryLogClient).disconnect(); + inOrder.verify(binaryLogClient).unregisterEventListener(objectUnderTest); + inOrder.verify(eventListnerExecutorService).shutdownNow(); + } + @ParameterizedTest @EnumSource(names = {"WRITE_ROWS", "EXT_WRITE_ROWS"}) void test_given_WriteRows_event_then_calls_correct_handler(EventType eventType) {