From 32ae3bcf98eaa0c27fc9f3e3b6d248cdfcc2c4a3 Mon Sep 17 00:00:00 2001 From: Alexander Schwartz Date: Wed, 11 Oct 2023 19:07:19 +0200 Subject: [PATCH] Ensure that the task is not in the tasks queue when it is rejected from the executor Closes #4900 --- .../java/io/vertx/core/impl/TaskQueue.java | 23 +++++++++++-- .../io/vertx/core/impl/TaskQueueTest.java | 34 +++++++++++++++++++ 2 files changed, 54 insertions(+), 3 deletions(-) create mode 100644 src/test/java/io/vertx/core/impl/TaskQueueTest.java diff --git a/src/main/java/io/vertx/core/impl/TaskQueue.java b/src/main/java/io/vertx/core/impl/TaskQueue.java index 37ff11c3cc1..06902308141 100644 --- a/src/main/java/io/vertx/core/impl/TaskQueue.java +++ b/src/main/java/io/vertx/core/impl/TaskQueue.java @@ -16,6 +16,7 @@ import java.util.LinkedList; import java.util.concurrent.Executor; +import java.util.concurrent.RejectedExecutionException; /** * A task queue that always run all tasks in order. The executor to run the tasks is passed @@ -87,11 +88,27 @@ private void run() { */ public void execute(Runnable task, Executor executor) { synchronized (tasks) { - tasks.add(new Task(task, executor)); if (current == null) { - current = executor; - executor.execute(runner); + try { + current = executor; + executor.execute(runner); + } catch (RejectedExecutionException e) { + current = null; + throw e; + } } + // Add the task after the runner has been accepted to the executor + // to cover the case of a rejected execution exception. + tasks.add(new Task(task, executor)); + } + } + + /** + * Test if the task queue is empty and no current executor is running anymore. + */ + public boolean isEmpty() { + synchronized (tasks) { + return tasks.isEmpty() && current == null; } } } diff --git a/src/test/java/io/vertx/core/impl/TaskQueueTest.java b/src/test/java/io/vertx/core/impl/TaskQueueTest.java new file mode 100644 index 00000000000..36b2092f1df --- /dev/null +++ b/src/test/java/io/vertx/core/impl/TaskQueueTest.java @@ -0,0 +1,34 @@ +package io.vertx.core.impl; + +import org.assertj.core.api.Assertions; +import org.junit.Test; + +import java.util.concurrent.Executor; +import java.util.concurrent.RejectedExecutionException; + +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** + * @author Alexander Schwartz + */ +public class TaskQueueTest { + + Executor executorThatAlwaysThrowsRejectedExceptions = new Executor() { + @Override + public void execute(Runnable command) { + throw new RejectedExecutionException(); + } + }; + + TaskQueue taskQueue = new TaskQueue(); + + @Test + public void shouldNotHaveTaskInQueueWhenTaskHasBeenRejected() { + assertThatThrownBy( + () -> taskQueue.execute(new Thread(), executorThatAlwaysThrowsRejectedExceptions) + ).isInstanceOf(RejectedExecutionException.class); + + Assertions.assertThat(taskQueue.isEmpty()).isTrue(); + } + +}