Skip to content

Commit

Permalink
Ensure that the task is not in the tasks queue when it is rejected fr…
Browse files Browse the repository at this point in the history
…om the executor

Closes #4900
  • Loading branch information
ahus1 authored and vietj committed Oct 23, 2023
1 parent cc0f05a commit 32ae3bc
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 3 deletions.
23 changes: 20 additions & 3 deletions src/main/java/io/vertx/core/impl/TaskQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import java.util.LinkedList;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;

/**
* A task queue that always run all tasks in order. The executor to run the tasks is passed
Expand Down Expand Up @@ -87,11 +88,27 @@ private void run() {
*/
public void execute(Runnable task, Executor executor) {
synchronized (tasks) {
tasks.add(new Task(task, executor));
if (current == null) {
current = executor;
executor.execute(runner);
try {
current = executor;
executor.execute(runner);
} catch (RejectedExecutionException e) {
current = null;
throw e;
}
}
// Add the task after the runner has been accepted to the executor
// to cover the case of a rejected execution exception.
tasks.add(new Task(task, executor));
}
}

/**
* Test if the task queue is empty and no current executor is running anymore.
*/
public boolean isEmpty() {
synchronized (tasks) {
return tasks.isEmpty() && current == null;
}
}
}
34 changes: 34 additions & 0 deletions src/test/java/io/vertx/core/impl/TaskQueueTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package io.vertx.core.impl;

import org.assertj.core.api.Assertions;
import org.junit.Test;

import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;

import static org.assertj.core.api.Assertions.assertThatThrownBy;

/**
* @author Alexander Schwartz
*/
public class TaskQueueTest {

Executor executorThatAlwaysThrowsRejectedExceptions = new Executor() {
@Override
public void execute(Runnable command) {
throw new RejectedExecutionException();
}
};

TaskQueue taskQueue = new TaskQueue();

@Test
public void shouldNotHaveTaskInQueueWhenTaskHasBeenRejected() {
assertThatThrownBy(
() -> taskQueue.execute(new Thread(), executorThatAlwaysThrowsRejectedExceptions)
).isInstanceOf(RejectedExecutionException.class);

Assertions.assertThat(taskQueue.isEmpty()).isTrue();
}

}

0 comments on commit 32ae3bc

Please sign in to comment.