diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/plugin/ExtensionPlugin.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/plugin/ExtensionPlugin.java index 9edf55c454..51339ca9c1 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/plugin/ExtensionPlugin.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/plugin/ExtensionPlugin.java @@ -20,4 +20,10 @@ public interface ExtensionPlugin { * @param extensionPoints The {@link ExtensionPoints} wherein the extension can extend behaviors. */ void apply(ExtensionPoints extensionPoints); + + /** + * Close resources used by the extension. + */ + default void shutdown() { + }; } diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/plugin/ExtensionPluginTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/plugin/ExtensionPluginTest.java new file mode 100644 index 0000000000..10c3b0102e --- /dev/null +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/plugin/ExtensionPluginTest.java @@ -0,0 +1,20 @@ +package org.opensearch.dataprepper.model.plugin; + +import org.junit.jupiter.api.Test; + +public class ExtensionPluginTest { + + @Test + void testShutdown() { + final ExtensionPlugin extensionPlugin = new ExtensionPluginTestImpl(); + extensionPlugin.shutdown(); + } + + static class ExtensionPluginTestImpl implements ExtensionPlugin { + + @Override + public void apply(ExtensionPoints extensionPoints) { + + } + } +} diff --git a/data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/ExtensionsApplier.java b/data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/ExtensionsApplier.java index b2bd9ffa6f..a55445003f 100644 --- a/data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/ExtensionsApplier.java +++ b/data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/ExtensionsApplier.java @@ -8,14 +8,17 @@ import org.opensearch.dataprepper.model.plugin.ExtensionPlugin; import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; import javax.inject.Inject; import javax.inject.Named; +import java.util.Collections; import java.util.List; @Named("extensionsApplier") class ExtensionsApplier { private final DataPrepperExtensionPoints dataPrepperExtensionPoints; private final ExtensionLoader extensionLoader; + private List loadedExtensionPlugins = Collections.emptyList(); @Inject ExtensionsApplier( @@ -27,10 +30,15 @@ class ExtensionsApplier { @PostConstruct void applyExtensions() { - final List extensionPlugins = extensionLoader.loadExtensions(); + loadedExtensionPlugins = extensionLoader.loadExtensions(); - for (ExtensionPlugin extensionPlugin : extensionPlugins) { + for (ExtensionPlugin extensionPlugin : loadedExtensionPlugins) { extensionPlugin.apply(dataPrepperExtensionPoints); } } + + @PreDestroy + public void shutdownExtensions() { + loadedExtensionPlugins.forEach(ExtensionPlugin::shutdown); + } } diff --git a/data-prepper-plugin-framework/src/test/java/org/opensearch/dataprepper/plugin/ExtensionsApplierTest.java b/data-prepper-plugin-framework/src/test/java/org/opensearch/dataprepper/plugin/ExtensionsApplierTest.java index 06c9ff2809..d3eb9a39cd 100644 --- a/data-prepper-plugin-framework/src/test/java/org/opensearch/dataprepper/plugin/ExtensionsApplierTest.java +++ b/data-prepper-plugin-framework/src/test/java/org/opensearch/dataprepper/plugin/ExtensionsApplierTest.java @@ -55,4 +55,14 @@ void applyExtensions_with_empty_extensions_is_ok() { createObjectUnderTest().applyExtensions(); } + + @Test + void shutDownExtensions_invokes_extension_plugin_shutdown() { + final ExtensionPlugin extensionPlugin = mock(ExtensionPlugin.class); + when(extensionLoader.loadExtensions()).thenReturn((List) List.of(extensionPlugin)); + final ExtensionsApplier objectUnderTest = createObjectUnderTest(); + objectUnderTest.applyExtensions(); + objectUnderTest.shutdownExtensions(); + verify(extensionPlugin).shutdown(); + } } \ No newline at end of file diff --git a/data-prepper-plugins/aws-plugin/src/main/java/org/opensearch/dataprepper/plugins/aws/AwsSecretPlugin.java b/data-prepper-plugins/aws-plugin/src/main/java/org/opensearch/dataprepper/plugins/aws/AwsSecretPlugin.java index 552106adb9..288f6d6e2b 100644 --- a/data-prepper-plugins/aws-plugin/src/main/java/org/opensearch/dataprepper/plugins/aws/AwsSecretPlugin.java +++ b/data-prepper-plugins/aws-plugin/src/main/java/org/opensearch/dataprepper/plugins/aws/AwsSecretPlugin.java @@ -43,7 +43,6 @@ public AwsSecretPlugin(final AwsSecretPluginConfig awsSecretPluginConfig) { scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); pluginMetrics = PluginMetrics.fromNames("secrets", "aws"); submitSecretsRefreshJobs(awsSecretPluginConfig, secretsSupplier); - Runtime.getRuntime().addShutdownHook(new Thread(this::shutdown)); } else { pluginConfigValueTranslator = null; } @@ -70,7 +69,8 @@ private void submitSecretsRefreshJobs(final AwsSecretPluginConfig awsSecretPlugi }); } - void shutdown() { + @Override + public void shutdown() { if (scheduledExecutorService != null) { LOG.info("Shutting down secrets refreshing tasks."); scheduledExecutorService.shutdown(); diff --git a/data-prepper-plugins/aws-plugin/src/test/java/org/opensearch/dataprepper/plugins/aws/AwsSecretPluginIT.java b/data-prepper-plugins/aws-plugin/src/test/java/org/opensearch/dataprepper/plugins/aws/AwsSecretPluginIT.java index 2f624611a9..a9f434d3e8 100644 --- a/data-prepper-plugins/aws-plugin/src/test/java/org/opensearch/dataprepper/plugins/aws/AwsSecretPluginIT.java +++ b/data-prepper-plugins/aws-plugin/src/test/java/org/opensearch/dataprepper/plugins/aws/AwsSecretPluginIT.java @@ -71,9 +71,6 @@ class AwsSecretPluginIT { @Mock private ScheduledExecutorService scheduledExecutorService; - @Mock - private Runtime runtime; - @Captor private ArgumentCaptor initialDelayCaptor; @@ -92,12 +89,9 @@ void testInitializationWithNonNullConfig() { when(awsSecretManagerConfiguration.createGetSecretValueRequest()).thenReturn(getSecretValueRequest); when(secretsManagerClient.getSecretValue(eq(getSecretValueRequest))).thenReturn(getSecretValueResponse); when(getSecretValueResponse.secretString()).thenReturn(UUID.randomUUID().toString()); - try (final MockedStatic executorsMockedStatic = mockStatic(Executors.class); - final MockedStatic runtimeMockedStatic = mockStatic(Runtime.class) - ) { + try (final MockedStatic executorsMockedStatic = mockStatic(Executors.class)) { executorsMockedStatic.when(Executors::newSingleThreadScheduledExecutor) .thenReturn(scheduledExecutorService); - runtimeMockedStatic.when(Runtime::getRuntime).thenReturn(runtime); objectUnderTest = new AwsSecretPlugin(awsSecretPluginConfig); objectUnderTest.apply(extensionPoints); } @@ -118,7 +112,6 @@ void testInitializationWithNonNullConfig() { any(), initialDelayCaptor.capture(), periodCaptor.capture(), eq(TimeUnit.SECONDS)); assertThat(initialDelayCaptor.getValue() >= testInterval.toSeconds(), is(true)); assertThat(periodCaptor.getValue(), equalTo(testInterval.toSeconds())); - verify(runtime).addShutdownHook(any()); } @Test @@ -130,12 +123,9 @@ void testInitializationWithDisableRefresh() { when(awsSecretManagerConfiguration.createGetSecretValueRequest()).thenReturn(getSecretValueRequest); when(secretsManagerClient.getSecretValue(eq(getSecretValueRequest))).thenReturn(getSecretValueResponse); when(getSecretValueResponse.secretString()).thenReturn(UUID.randomUUID().toString()); - try (final MockedStatic executorsMockedStatic = mockStatic(Executors.class); - final MockedStatic runtimeMockedStatic = mockStatic(Runtime.class) - ) { + try (final MockedStatic executorsMockedStatic = mockStatic(Executors.class)) { executorsMockedStatic.when(Executors::newSingleThreadScheduledExecutor) .thenReturn(scheduledExecutorService); - runtimeMockedStatic.when(Runtime::getRuntime).thenReturn(runtime); objectUnderTest = new AwsSecretPlugin(awsSecretPluginConfig); objectUnderTest.apply(extensionPoints); } @@ -153,7 +143,6 @@ void testInitializationWithDisableRefresh() { actualExtensionProviders.get(1).provideInstance(context); assertThat(optionalPluginConfigPublisher.isPresent(), is(true)); verifyNoInteractions(scheduledExecutorService); - verify(runtime).addShutdownHook(any()); } @Test @@ -178,12 +167,9 @@ void testInitializationWithNullConfig() { void testShutdownAwaitTerminationSuccess() throws InterruptedException { when(awsSecretPluginConfig.getAwsSecretManagerConfigurationMap()).thenReturn( Collections.emptyMap()); - try (final MockedStatic executorsMockedStatic = mockStatic(Executors.class); - final MockedStatic runtimeMockedStatic = mockStatic(Runtime.class) - ) { + try (final MockedStatic executorsMockedStatic = mockStatic(Executors.class)) { executorsMockedStatic.when(Executors::newSingleThreadScheduledExecutor) .thenReturn(scheduledExecutorService); - runtimeMockedStatic.when(Runtime::getRuntime).thenReturn(runtime); objectUnderTest = new AwsSecretPlugin(awsSecretPluginConfig); } when(scheduledExecutorService.awaitTermination(anyLong(), any(TimeUnit.class))).thenReturn(true); @@ -199,12 +185,9 @@ void testShutdownAwaitTerminationSuccess() throws InterruptedException { void testShutdownAwaitTerminationTimeout() throws InterruptedException { when(awsSecretPluginConfig.getAwsSecretManagerConfigurationMap()).thenReturn( Collections.emptyMap()); - try (final MockedStatic executorsMockedStatic = mockStatic(Executors.class); - final MockedStatic runtimeMockedStatic = mockStatic(Runtime.class) - ) { + try (final MockedStatic executorsMockedStatic = mockStatic(Executors.class)) { executorsMockedStatic.when(Executors::newSingleThreadScheduledExecutor) .thenReturn(scheduledExecutorService); - runtimeMockedStatic.when(Runtime::getRuntime).thenReturn(runtime); objectUnderTest = new AwsSecretPlugin(awsSecretPluginConfig); } when(scheduledExecutorService.awaitTermination(anyLong(), any(TimeUnit.class))).thenReturn(false); @@ -220,12 +203,9 @@ void testShutdownAwaitTerminationTimeout() throws InterruptedException { void testShutdownAwaitTerminationInterrupted() throws InterruptedException { when(awsSecretPluginConfig.getAwsSecretManagerConfigurationMap()).thenReturn( Collections.emptyMap()); - try (final MockedStatic executorsMockedStatic = mockStatic(Executors.class); - final MockedStatic runtimeMockedStatic = mockStatic(Runtime.class) - ) { + try (final MockedStatic executorsMockedStatic = mockStatic(Executors.class)) { executorsMockedStatic.when(Executors::newSingleThreadScheduledExecutor) .thenReturn(scheduledExecutorService); - runtimeMockedStatic.when(Runtime::getRuntime).thenReturn(runtime); objectUnderTest = new AwsSecretPlugin(awsSecretPluginConfig); } when(scheduledExecutorService.awaitTermination(anyLong(), any(TimeUnit.class))) @@ -240,12 +220,9 @@ void testShutdownAwaitTerminationInterrupted() throws InterruptedException { @Test void testShutdownWithNullScheduledExecutorService() { - try (final MockedStatic executorsMockedStatic = mockStatic(Executors.class); - final MockedStatic runtimeMockedStatic = mockStatic(Runtime.class) - ) { + try (final MockedStatic executorsMockedStatic = mockStatic(Executors.class)) { executorsMockedStatic.when(Executors::newSingleThreadScheduledExecutor) .thenReturn(scheduledExecutorService); - runtimeMockedStatic.when(Runtime::getRuntime).thenReturn(runtime); objectUnderTest = new AwsSecretPlugin(null); } objectUnderTest.shutdown();