Skip to content

Commit

Permalink
ENH: add shutdown into extension plugin (#4924)
Browse files Browse the repository at this point in the history
* ENH: add shutdown into extension plugin

Signed-off-by: George Chen <qchea@amazon.com>
  • Loading branch information
chenqi0805 committed Sep 10, 2024
1 parent 0d121bc commit af9cab8
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,10 @@ public interface ExtensionPlugin {
* @param extensionPoints The {@link ExtensionPoints} wherein the extension can extend behaviors.
*/
void apply(ExtensionPoints extensionPoints);

/**
* Close resources used by the extension.
*/
default void shutdown() {
};
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package org.opensearch.dataprepper.model.plugin;

import org.junit.jupiter.api.Test;

public class ExtensionPluginTest {

@Test
void testShutdown() {
final ExtensionPlugin extensionPlugin = new ExtensionPluginTestImpl();
extensionPlugin.shutdown();
}

static class ExtensionPluginTestImpl implements ExtensionPlugin {

@Override
public void apply(ExtensionPoints extensionPoints) {

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,17 @@
import org.opensearch.dataprepper.model.plugin.ExtensionPlugin;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.inject.Inject;
import javax.inject.Named;
import java.util.Collections;
import java.util.List;

@Named("extensionsApplier")
class ExtensionsApplier {
private final DataPrepperExtensionPoints dataPrepperExtensionPoints;
private final ExtensionLoader extensionLoader;
private List<? extends ExtensionPlugin> loadedExtensionPlugins = Collections.emptyList();

@Inject
ExtensionsApplier(
Expand All @@ -27,10 +30,15 @@ class ExtensionsApplier {

@PostConstruct
void applyExtensions() {
final List<? extends ExtensionPlugin> extensionPlugins = extensionLoader.loadExtensions();
loadedExtensionPlugins = extensionLoader.loadExtensions();

for (ExtensionPlugin extensionPlugin : extensionPlugins) {
for (ExtensionPlugin extensionPlugin : loadedExtensionPlugins) {
extensionPlugin.apply(dataPrepperExtensionPoints);
}
}

@PreDestroy
public void shutdownExtensions() {
loadedExtensionPlugins.forEach(ExtensionPlugin::shutdown);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,14 @@ void applyExtensions_with_empty_extensions_is_ok() {

createObjectUnderTest().applyExtensions();
}

@Test
void shutDownExtensions_invokes_extension_plugin_shutdown() {
final ExtensionPlugin extensionPlugin = mock(ExtensionPlugin.class);
when(extensionLoader.loadExtensions()).thenReturn((List) List.of(extensionPlugin));
final ExtensionsApplier objectUnderTest = createObjectUnderTest();
objectUnderTest.applyExtensions();
objectUnderTest.shutdownExtensions();
verify(extensionPlugin).shutdown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ public AwsSecretPlugin(final AwsSecretPluginConfig awsSecretPluginConfig) {
scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
pluginMetrics = PluginMetrics.fromNames("secrets", "aws");
submitSecretsRefreshJobs(awsSecretPluginConfig, secretsSupplier);
Runtime.getRuntime().addShutdownHook(new Thread(this::shutdown));
} else {
pluginConfigValueTranslator = null;
}
Expand All @@ -70,7 +69,8 @@ private void submitSecretsRefreshJobs(final AwsSecretPluginConfig awsSecretPlugi
});
}

void shutdown() {
@Override
public void shutdown() {
if (scheduledExecutorService != null) {
LOG.info("Shutting down secrets refreshing tasks.");
scheduledExecutorService.shutdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,6 @@ class AwsSecretPluginIT {
@Mock
private ScheduledExecutorService scheduledExecutorService;

@Mock
private Runtime runtime;

@Captor
private ArgumentCaptor<Long> initialDelayCaptor;

Expand All @@ -92,12 +89,9 @@ void testInitializationWithNonNullConfig() {
when(awsSecretManagerConfiguration.createGetSecretValueRequest()).thenReturn(getSecretValueRequest);
when(secretsManagerClient.getSecretValue(eq(getSecretValueRequest))).thenReturn(getSecretValueResponse);
when(getSecretValueResponse.secretString()).thenReturn(UUID.randomUUID().toString());
try (final MockedStatic<Executors> executorsMockedStatic = mockStatic(Executors.class);
final MockedStatic<Runtime> runtimeMockedStatic = mockStatic(Runtime.class)
) {
try (final MockedStatic<Executors> executorsMockedStatic = mockStatic(Executors.class)) {
executorsMockedStatic.when(Executors::newSingleThreadScheduledExecutor)
.thenReturn(scheduledExecutorService);
runtimeMockedStatic.when(Runtime::getRuntime).thenReturn(runtime);
objectUnderTest = new AwsSecretPlugin(awsSecretPluginConfig);
objectUnderTest.apply(extensionPoints);
}
Expand All @@ -118,7 +112,6 @@ void testInitializationWithNonNullConfig() {
any(), initialDelayCaptor.capture(), periodCaptor.capture(), eq(TimeUnit.SECONDS));
assertThat(initialDelayCaptor.getValue() >= testInterval.toSeconds(), is(true));
assertThat(periodCaptor.getValue(), equalTo(testInterval.toSeconds()));
verify(runtime).addShutdownHook(any());
}

@Test
Expand All @@ -130,12 +123,9 @@ void testInitializationWithDisableRefresh() {
when(awsSecretManagerConfiguration.createGetSecretValueRequest()).thenReturn(getSecretValueRequest);
when(secretsManagerClient.getSecretValue(eq(getSecretValueRequest))).thenReturn(getSecretValueResponse);
when(getSecretValueResponse.secretString()).thenReturn(UUID.randomUUID().toString());
try (final MockedStatic<Executors> executorsMockedStatic = mockStatic(Executors.class);
final MockedStatic<Runtime> runtimeMockedStatic = mockStatic(Runtime.class)
) {
try (final MockedStatic<Executors> executorsMockedStatic = mockStatic(Executors.class)) {
executorsMockedStatic.when(Executors::newSingleThreadScheduledExecutor)
.thenReturn(scheduledExecutorService);
runtimeMockedStatic.when(Runtime::getRuntime).thenReturn(runtime);
objectUnderTest = new AwsSecretPlugin(awsSecretPluginConfig);
objectUnderTest.apply(extensionPoints);
}
Expand All @@ -153,7 +143,6 @@ void testInitializationWithDisableRefresh() {
actualExtensionProviders.get(1).provideInstance(context);
assertThat(optionalPluginConfigPublisher.isPresent(), is(true));
verifyNoInteractions(scheduledExecutorService);
verify(runtime).addShutdownHook(any());
}

@Test
Expand All @@ -178,12 +167,9 @@ void testInitializationWithNullConfig() {
void testShutdownAwaitTerminationSuccess() throws InterruptedException {
when(awsSecretPluginConfig.getAwsSecretManagerConfigurationMap()).thenReturn(
Collections.emptyMap());
try (final MockedStatic<Executors> executorsMockedStatic = mockStatic(Executors.class);
final MockedStatic<Runtime> runtimeMockedStatic = mockStatic(Runtime.class)
) {
try (final MockedStatic<Executors> executorsMockedStatic = mockStatic(Executors.class)) {
executorsMockedStatic.when(Executors::newSingleThreadScheduledExecutor)
.thenReturn(scheduledExecutorService);
runtimeMockedStatic.when(Runtime::getRuntime).thenReturn(runtime);
objectUnderTest = new AwsSecretPlugin(awsSecretPluginConfig);
}
when(scheduledExecutorService.awaitTermination(anyLong(), any(TimeUnit.class))).thenReturn(true);
Expand All @@ -199,12 +185,9 @@ void testShutdownAwaitTerminationSuccess() throws InterruptedException {
void testShutdownAwaitTerminationTimeout() throws InterruptedException {
when(awsSecretPluginConfig.getAwsSecretManagerConfigurationMap()).thenReturn(
Collections.emptyMap());
try (final MockedStatic<Executors> executorsMockedStatic = mockStatic(Executors.class);
final MockedStatic<Runtime> runtimeMockedStatic = mockStatic(Runtime.class)
) {
try (final MockedStatic<Executors> executorsMockedStatic = mockStatic(Executors.class)) {
executorsMockedStatic.when(Executors::newSingleThreadScheduledExecutor)
.thenReturn(scheduledExecutorService);
runtimeMockedStatic.when(Runtime::getRuntime).thenReturn(runtime);
objectUnderTest = new AwsSecretPlugin(awsSecretPluginConfig);
}
when(scheduledExecutorService.awaitTermination(anyLong(), any(TimeUnit.class))).thenReturn(false);
Expand All @@ -220,12 +203,9 @@ void testShutdownAwaitTerminationTimeout() throws InterruptedException {
void testShutdownAwaitTerminationInterrupted() throws InterruptedException {
when(awsSecretPluginConfig.getAwsSecretManagerConfigurationMap()).thenReturn(
Collections.emptyMap());
try (final MockedStatic<Executors> executorsMockedStatic = mockStatic(Executors.class);
final MockedStatic<Runtime> runtimeMockedStatic = mockStatic(Runtime.class)
) {
try (final MockedStatic<Executors> executorsMockedStatic = mockStatic(Executors.class)) {
executorsMockedStatic.when(Executors::newSingleThreadScheduledExecutor)
.thenReturn(scheduledExecutorService);
runtimeMockedStatic.when(Runtime::getRuntime).thenReturn(runtime);
objectUnderTest = new AwsSecretPlugin(awsSecretPluginConfig);
}
when(scheduledExecutorService.awaitTermination(anyLong(), any(TimeUnit.class)))
Expand All @@ -240,12 +220,9 @@ void testShutdownAwaitTerminationInterrupted() throws InterruptedException {

@Test
void testShutdownWithNullScheduledExecutorService() {
try (final MockedStatic<Executors> executorsMockedStatic = mockStatic(Executors.class);
final MockedStatic<Runtime> runtimeMockedStatic = mockStatic(Runtime.class)
) {
try (final MockedStatic<Executors> executorsMockedStatic = mockStatic(Executors.class)) {
executorsMockedStatic.when(Executors::newSingleThreadScheduledExecutor)
.thenReturn(scheduledExecutorService);
runtimeMockedStatic.when(Runtime::getRuntime).thenReturn(runtime);
objectUnderTest = new AwsSecretPlugin(null);
}
objectUnderTest.shutdown();
Expand Down

0 comments on commit af9cab8

Please sign in to comment.