diff --git a/framework-docs/modules/ROOT/pages/integration/scheduling.adoc b/framework-docs/modules/ROOT/pages/integration/scheduling.adoc index 415e043920a8..c6e7af27a1ef 100644 --- a/framework-docs/modules/ROOT/pages/integration/scheduling.adoc +++ b/framework-docs/modules/ROOT/pages/integration/scheduling.adoc @@ -62,22 +62,27 @@ The variants that Spring provides are as follows: `ConcurrentTaskExecutor` directly. However, if the `ThreadPoolTaskExecutor` is not flexible enough for your needs, `ConcurrentTaskExecutor` is an alternative. * `ThreadPoolTaskExecutor`: - This implementation is most commonly used. It exposes bean properties for - configuring a `java.util.concurrent.ThreadPoolExecutor` and wraps it in a `TaskExecutor`. - If you need to adapt to a different kind of `java.util.concurrent.Executor`, we - recommend that you use a `ConcurrentTaskExecutor` instead. + This implementation is most commonly used. It exposes bean properties for configuring + a `java.util.concurrent.ThreadPoolExecutor` and wraps it in a `TaskExecutor`. + If you need to adapt to a different kind of `java.util.concurrent.Executor`, + we recommend that you use a `ConcurrentTaskExecutor` instead. * `DefaultManagedTaskExecutor`: This implementation uses a JNDI-obtained `ManagedExecutorService` in a JSR-236 compatible runtime environment (such as a Jakarta EE application server), replacing a CommonJ WorkManager for that purpose. +As of 6.1, `ThreadPoolTaskExecutor` provides a pause/resume capability and graceful +shutdown through Spring's lifecycle management. There is also a new "virtualThreads" +option on `SimpleAsyncTaskExecutor` which is aligned with JDK 21's Virtual Threads, +as well as a graceful shutdown capability for `SimpleAsyncTaskExecutor` as well. + [[scheduling-task-executor-usage]] === Using a `TaskExecutor` -Spring's `TaskExecutor` implementations are used as simple JavaBeans. In the following example, -we define a bean that uses the `ThreadPoolTaskExecutor` to asynchronously print -out a set of messages: +Spring's `TaskExecutor` implementations are commonly used with dependency injection. +In the following example, we define a bean that uses the `ThreadPoolTaskExecutor` +to asynchronously print out a set of messages: [source,java,indent=0,subs="verbatim,quotes"] ---- @@ -227,8 +232,8 @@ fixed delay, those methods should be used directly whenever possible. The value `PeriodicTrigger` implementation is that you can use it within components that rely on the `Trigger` abstraction. For example, it may be convenient to allow periodic triggers, cron-based triggers, and even custom trigger implementations to be used interchangeably. -Such a component could take advantage of dependency injection so that you can configure such `Triggers` -externally and, therefore, easily modify or extend them. +Such a component could take advantage of dependency injection so that you can configure +such `Triggers` externally and, therefore, easily modify or extend them. [[scheduling-task-scheduler-implementations]] @@ -238,10 +243,8 @@ As with Spring's `TaskExecutor` abstraction, the primary benefit of the `TaskSch arrangement is that an application's scheduling needs are decoupled from the deployment environment. This abstraction level is particularly relevant when deploying to an application server environment where threads should not be created directly by the -application itself. For such scenarios, Spring provides a `TimerManagerTaskScheduler` -that delegates to a CommonJ `TimerManager` on WebLogic or WebSphere as well as a more recent -`DefaultManagedTaskScheduler` that delegates to a JSR-236 `ManagedScheduledExecutorService` -in a Jakarta EE environment. Both are typically configured with a JNDI lookup. +application itself. For such scenarios, Spring provides a `DefaultManagedTaskScheduler` +that delegates to a JSR-236 `ManagedScheduledExecutorService` in a Jakarta EE environment. Whenever external thread management is not a requirement, a simpler alternative is a local `ScheduledExecutorService` setup within the application, which can be adapted @@ -251,6 +254,11 @@ to provide common bean-style configuration along the lines of `ThreadPoolTaskExe These variants work perfectly fine for locally embedded thread pool setups in lenient application server environments, as well -- in particular on Tomcat and Jetty. +As of 6.1, `ThreadPoolTaskScheduler` provides a pause/resume capability and graceful +shutdown through Spring's lifecycle management. There is also a new option called +`SimpleAsyncTaskScheduler` which is aligned with JDK 21's Virtual Threads, using a +single scheduler thread but firing up a new thread for every scheduled task execution. + [[scheduling-annotation-support]] diff --git a/spring-context/src/main/java/org/springframework/scheduling/concurrent/SimpleAsyncTaskScheduler.java b/spring-context/src/main/java/org/springframework/scheduling/concurrent/SimpleAsyncTaskScheduler.java index 8a8b7f53102d..8fb44864f6e4 100644 --- a/spring-context/src/main/java/org/springframework/scheduling/concurrent/SimpleAsyncTaskScheduler.java +++ b/spring-context/src/main/java/org/springframework/scheduling/concurrent/SimpleAsyncTaskScheduler.java @@ -44,7 +44,19 @@ * A simple implementation of Spring's {@link TaskScheduler} interface, using * a single scheduler thread and executing every scheduled task in an individual * separate thread. This is an attractive choice with virtual threads on JDK 21, - * so it is commonly used with {@link #setVirtualThreads setVirtualThreads(true)}. + * expecting common usage with {@link #setVirtualThreads setVirtualThreads(true)}. + * + *

Supports a graceful shutdown through {@link #setTaskTerminationTimeout}, + * at the expense of task tracking overhead per execution thread at runtime. + * Supports limiting concurrent threads through {@link #setConcurrencyLimit}. + * By default, the number of concurrent task executions is unlimited. + * This allows for dynamic concurrency of scheduled task executions, in contrast + * to {@link ThreadPoolTaskScheduler} which requires a fixed pool size. + * + *

NOTE: This implementation does not reuse threads! Consider a + * thread-pooling TaskScheduler implementation instead, in particular for + * scheduling a large number of short-lived tasks. Alternatively, on JDK 21, + * consider setting {@link #setVirtualThreads} to {@code true}. * *

Extends {@link SimpleAsyncTaskExecutor} and can serve as a fully capable * replacement for it, e.g. as a single shared instance serving as a @@ -64,13 +76,14 @@ * @author Juergen Hoeller * @since 6.1 * @see #setVirtualThreads - * @see #setTargetTaskExecutor + * @see #setTaskTerminationTimeout + * @see #setConcurrencyLimit * @see SimpleAsyncTaskExecutor * @see ThreadPoolTaskScheduler */ @SuppressWarnings("serial") public class SimpleAsyncTaskScheduler extends SimpleAsyncTaskExecutor implements TaskScheduler, - ApplicationContextAware, SmartLifecycle, ApplicationListener, AutoCloseable { + ApplicationContextAware, SmartLifecycle, ApplicationListener { private static final TimeUnit NANO = TimeUnit.NANOSECONDS; @@ -275,6 +288,7 @@ public void close() { future.cancel(true); } } + super.close(); } } diff --git a/spring-context/src/test/java/org/springframework/scheduling/annotation/EnableSchedulingTests.java b/spring-context/src/test/java/org/springframework/scheduling/annotation/EnableSchedulingTests.java index 73e7e8ec870f..aa3a1d5f063d 100644 --- a/spring-context/src/test/java/org/springframework/scheduling/annotation/EnableSchedulingTests.java +++ b/spring-context/src/test/java/org/springframework/scheduling/annotation/EnableSchedulingTests.java @@ -25,6 +25,7 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @@ -33,6 +34,7 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.support.PropertySourcesPlaceholderConfigurer; +import org.springframework.core.task.TaskExecutor; import org.springframework.core.testfixture.EnabledForTestGroups; import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.concurrent.SimpleAsyncTaskScheduler; @@ -66,6 +68,10 @@ public void tearDown() { } + /* + * Tests compatibility between default executor in TaskSchedulerRouter + * and explicit ThreadPoolTaskScheduler in configuration subclass. + */ @ParameterizedTest @ValueSource(classes = {FixedRateTaskConfig.class, FixedRateTaskConfigSubclass.class}) @EnabledForTestGroups(LONG_RUNNING) @@ -77,8 +83,14 @@ public void withFixedRateTask(Class configClass) throws InterruptedException assertThat(ctx.getBean(AtomicInteger.class).get()).isGreaterThanOrEqualTo(10); } + /* + * Tests compatibility between SimpleAsyncTaskScheduler in regular configuration + * and explicit ThreadPoolTaskScheduler in configuration subclass. This includes + * pause/resume behavior and a controlled shutdown with a 1s termination timeout. + */ @ParameterizedTest @ValueSource(classes = {ExplicitSchedulerConfig.class, ExplicitSchedulerConfigSubclass.class}) + @Timeout(2) // should actually complete within 1s @EnabledForTestGroups(LONG_RUNNING) public void withExplicitScheduler(Class configClass) throws InterruptedException { ctx = new AnnotationConfigApplicationContext(configClass); @@ -96,9 +108,35 @@ public void withExplicitScheduler(Class configClass) throws InterruptedExcept int count3 = ctx.getBean(AtomicInteger.class).get(); assertThat(count3).isGreaterThanOrEqualTo(20); + TaskExecutor executor = ctx.getBean(TaskExecutor.class); + AtomicInteger count = new AtomicInteger(0); + for (int i = 0; i < 2; i++) { + executor.execute(() -> { + try { + Thread.sleep(10000); // try to break test timeout + } + catch (InterruptedException ex) { + // expected during executor shutdown + try { + Thread.sleep(500); + // should get here within task termination timeout (1000) + count.incrementAndGet(); + } + catch (InterruptedException ex2) { + // not expected + } + } + }); + } + assertThat(ctx.getBean(ExplicitSchedulerConfig.class).threadName).startsWith("explicitScheduler-"); - assertThat(Arrays.asList(ctx.getDefaultListableBeanFactory().getDependentBeans("myTaskScheduler")).contains( - TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)).isTrue(); + assertThat(Arrays.asList(ctx.getDefaultListableBeanFactory().getDependentBeans("myTaskScheduler")) + .contains(TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)).isTrue(); + + // Include executor shutdown in test timeout (2 seconds), + // expecting interruption of the sleeping thread... + ctx.close(); + assertThat(count.intValue()).isEqualTo(2); } @Test @@ -226,6 +264,11 @@ public void task() { @Configuration static class FixedRateTaskConfigSubclass extends FixedRateTaskConfig { + + @Bean + public TaskScheduler taskScheduler() { + return new ThreadPoolTaskScheduler(); + } } @@ -239,6 +282,7 @@ static class ExplicitSchedulerConfig { public TaskScheduler myTaskScheduler() { SimpleAsyncTaskScheduler scheduler = new SimpleAsyncTaskScheduler(); scheduler.setThreadNamePrefix("explicitScheduler-"); + scheduler.setTaskTerminationTimeout(1000); return scheduler; } @@ -263,6 +307,8 @@ static class ExplicitSchedulerConfigSubclass extends ExplicitSchedulerConfig { public TaskScheduler myTaskScheduler() { ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); scheduler.setThreadNamePrefix("explicitScheduler-"); + scheduler.setAwaitTerminationMillis(1000); + scheduler.setPoolSize(2); return scheduler; } } @@ -437,6 +483,7 @@ public void task() { public TaskScheduler taskScheduler1() { SimpleAsyncTaskScheduler scheduler = new SimpleAsyncTaskScheduler(); scheduler.setThreadNamePrefix("explicitScheduler1"); + scheduler.setConcurrencyLimit(1); return scheduler; } @@ -478,6 +525,7 @@ public void configureTasks(ScheduledTaskRegistrar taskRegistrar) { public TaskScheduler taskScheduler1() { SimpleAsyncTaskScheduler scheduler = new SimpleAsyncTaskScheduler(); scheduler.setThreadNamePrefix("explicitScheduler1-"); + scheduler.setConcurrencyLimit(1); return scheduler; } @@ -508,6 +556,7 @@ public ThreadAwareWorker worker() { public TaskScheduler taskScheduler1() { SimpleAsyncTaskScheduler scheduler = new SimpleAsyncTaskScheduler(); scheduler.setThreadNamePrefix("explicitScheduler1-"); + scheduler.setConcurrencyLimit(1); return scheduler; } diff --git a/spring-core/src/main/java/org/springframework/core/task/SimpleAsyncTaskExecutor.java b/spring-core/src/main/java/org/springframework/core/task/SimpleAsyncTaskExecutor.java index c4fc4374f9f3..ce7eee8c1296 100644 --- a/spring-core/src/main/java/org/springframework/core/task/SimpleAsyncTaskExecutor.java +++ b/spring-core/src/main/java/org/springframework/core/task/SimpleAsyncTaskExecutor.java @@ -17,7 +17,10 @@ package org.springframework.core.task; import java.io.Serializable; +import java.util.Collections; +import java.util.Set; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Future; import java.util.concurrent.FutureTask; import java.util.concurrent.ThreadFactory; @@ -31,10 +34,12 @@ /** * {@link TaskExecutor} implementation that fires up a new Thread for each task, - * executing it asynchronously. Supports a virtual thread option on JDK 21. + * executing it asynchronously. Provides a virtual thread option on JDK 21. * - *

Supports limiting concurrent threads through the "concurrencyLimit" - * bean property. By default, the number of concurrent threads is unlimited. + *

Supports a graceful shutdown through {@link #setTaskTerminationTimeout}, + * at the expense of task tracking overhead per execution thread at runtime. + * Supports limiting concurrent threads through {@link #setConcurrencyLimit}. + * By default, the number of concurrent task executions is unlimited. * *

NOTE: This implementation does not reuse threads! Consider a * thread-pooling TaskExecutor implementation instead, in particular for @@ -44,13 +49,14 @@ * @author Juergen Hoeller * @since 2.0 * @see #setVirtualThreads + * @see #setTaskTerminationTimeout * @see #setConcurrencyLimit - * @see SyncTaskExecutor + * @see org.springframework.scheduling.concurrent.SimpleAsyncTaskScheduler * @see org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor */ @SuppressWarnings({"serial", "deprecation"}) public class SimpleAsyncTaskExecutor extends CustomizableThreadCreator - implements AsyncListenableTaskExecutor, Serializable { + implements AsyncListenableTaskExecutor, Serializable, AutoCloseable { /** * Permit any number of concurrent invocations: that is, don't throttle concurrency. @@ -77,6 +83,13 @@ public class SimpleAsyncTaskExecutor extends CustomizableThreadCreator @Nullable private TaskDecorator taskDecorator; + private long taskTerminationTimeout; + + @Nullable + private Set activeThreads; + + private volatile boolean active = true; + /** * Create a new SimpleAsyncTaskExecutor with default thread name prefix. @@ -147,33 +160,62 @@ public final ThreadFactory getThreadFactory() { * have to cast it and call {@code Future#get} to evaluate exceptions. * @since 4.3 */ - public final void setTaskDecorator(TaskDecorator taskDecorator) { + public void setTaskDecorator(TaskDecorator taskDecorator) { this.taskDecorator = taskDecorator; } /** - * Set the maximum number of parallel accesses allowed. - * -1 indicates no concurrency limit at all. - *

In principle, this limit can be changed at runtime, - * although it is generally designed as a config time setting. - * NOTE: Do not switch between -1 and any concrete limit at runtime, - * as this will lead to inconsistent concurrency counts: A limit - * of -1 effectively turns off concurrency counting completely. + * Specify a timeout for task termination when closing this executor. + * The default is 0, not waiting for task termination at all. + *

Note that a concrete >0 timeout specified here will lead to the + * wrapping of every submitted task into a task-tracking runnable which + * involves considerable overhead in case of a high number of tasks. + * However, for a modest level of submissions with longer-running + * tasks, this is feasible in order to arrive at a graceful shutdown. + * @param timeout the timeout in milliseconds + * @since 6.1 + * @see #close() + * @see org.springframework.scheduling.concurrent.ExecutorConfigurationSupport#setAwaitTerminationMillis + */ + public void setTaskTerminationTimeout(long timeout) { + Assert.isTrue(timeout >= 0, "Timeout value must be >=0"); + this.taskTerminationTimeout = timeout; + this.activeThreads = (timeout > 0 ? Collections.newSetFromMap(new ConcurrentHashMap<>()) : null); + } + + /** + * Return whether this executor is still active, i.e. not closed yet, + * and therefore accepts further task submissions. Otherwise, it is + * either in the task termination phase or entirely shut down already. + * @since 6.1 + * @see #setTaskTerminationTimeout + * @see #close() + */ + public boolean isActive() { + return this.active; + } + + /** + * Set the maximum number of parallel task executions allowed. + * The default of -1 indicates no concurrency limit at all. + *

This is the equivalent of a maximum pool size in a thread pool, + * preventing temporary overload of the thread management system. * @see #UNBOUNDED_CONCURRENCY + * @see org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor#setMaxPoolSize */ public void setConcurrencyLimit(int concurrencyLimit) { this.concurrencyThrottle.setConcurrencyLimit(concurrencyLimit); } /** - * Return the maximum number of parallel accesses allowed. + * Return the maximum number of parallel task executions allowed. */ public final int getConcurrencyLimit() { return this.concurrencyThrottle.getConcurrencyLimit(); } /** - * Return whether this throttle is currently active. + * Return whether the concurrency throttle is currently active. * @return {@code true} if the concurrency limit for this instance is active * @see #getConcurrencyLimit() * @see #setConcurrencyLimit @@ -207,10 +249,17 @@ public void execute(Runnable task) { @Override public void execute(Runnable task, long startTimeout) { Assert.notNull(task, "Runnable must not be null"); + if (!isActive()) { + throw new TaskRejectedException(getClass().getSimpleName() + " has been closed already"); + } + Runnable taskToUse = (this.taskDecorator != null ? this.taskDecorator.decorate(task) : task); if (isThrottleActive() && startTimeout > TIMEOUT_IMMEDIATE) { this.concurrencyThrottle.beforeAccess(); - doExecute(new ConcurrencyThrottlingRunnable(taskToUse)); + doExecute(new TaskTrackingRunnable(taskToUse)); + } + else if (this.activeThreads != null) { + doExecute(new TaskTrackingRunnable(taskToUse)); } else { doExecute(taskToUse); @@ -278,6 +327,33 @@ protected Thread newThread(Runnable task) { } } + /** + * This close methods tracks the termination of active threads if a concrete + * {@link #setTaskTerminationTimeout task termination timeout} has been set. + * Otherwise, it is not necessary to close this executor. + * @since 6.1 + */ + @Override + public void close() { + if (this.active) { + this.active = false; + Set threads = this.activeThreads; + if (threads != null) { + threads.forEach(Thread::interrupt); + synchronized (threads) { + try { + if (!threads.isEmpty()) { + threads.wait(this.taskTerminationTimeout); + } + } + catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + } + } + } + } + /** * Subclass of the general ConcurrencyThrottleSupport class, @@ -299,23 +375,40 @@ protected void afterAccess() { /** - * This Runnable calls {@code afterAccess()} after the - * target Runnable has finished its execution. + * Decorates a target task with active thread tracking + * and concurrency throttle management, if necessary. */ - private class ConcurrencyThrottlingRunnable implements Runnable { + private class TaskTrackingRunnable implements Runnable { - private final Runnable target; + private final Runnable task; - public ConcurrencyThrottlingRunnable(Runnable target) { - this.target = target; + public TaskTrackingRunnable(Runnable task) { + Assert.notNull(task, "Task must not be null"); + this.task = task; } @Override public void run() { + Set threads = activeThreads; + Thread thread = null; + if (threads != null) { + thread = Thread.currentThread(); + threads.add(thread); + } try { - this.target.run(); + this.task.run(); } finally { + if (threads != null) { + threads.remove(thread); + if (!isActive()) { + synchronized (threads) { + if (threads.isEmpty()) { + threads.notify(); + } + } + } + } concurrencyThrottle.afterAccess(); } }