diff --git a/server/src/test/java/org/opensearch/cluster/service/MasterServiceTests.java b/server/src/test/java/org/opensearch/cluster/service/MasterServiceTests.java index 3c27748daa87d..d4804b18bd160 100644 --- a/server/src/test/java/org/opensearch/cluster/service/MasterServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/service/MasterServiceTests.java @@ -86,6 +86,8 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import com.carrotsearch.randomizedtesting.annotations.Timeout; + import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; import static org.hamcrest.Matchers.anyOf; @@ -863,6 +865,7 @@ public ClusterManagerTaskThrottler.ThrottlingKey getClusterManagerThrottlingKey( AtomicInteger throttledTask3 = new AtomicInteger(); AtomicInteger succeededTask1 = new AtomicInteger(); AtomicInteger succeededTask2 = new AtomicInteger(); + AtomicInteger succeededTask3 = new AtomicInteger(); AtomicInteger timedOutTask3 = new AtomicInteger(); final ClusterStateTaskListener listener = new ClusterStateTaskListener() { @@ -880,6 +883,8 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS succeededTask1.incrementAndGet(); } else if (source.equals(task2)) { succeededTask2.incrementAndGet(); + } else if (source.equals(task3)) { + succeededTask3.incrementAndGet(); } latch.countDown(); } @@ -955,7 +960,7 @@ public void run() { assertEquals(numberOfTask1, throttledTask1.get() + succeededTask1.get()); assertEquals(numberOfTask2, succeededTask2.get()); assertEquals(0, throttledTask2.get()); - assertEquals(numberOfTask3, throttledTask3.get() + timedOutTask3.get()); + assertEquals(numberOfTask3, throttledTask3.get() + timedOutTask3.get() + succeededTask3.get()); masterService.close(); } @@ -1378,6 +1383,76 @@ public void testDeprecatedMasterServiceUpdateTaskThreadName() { assertThrows(AssertionError.class, () -> MasterService.assertClusterManagerUpdateThread()); } + @Timeout(millis = 5_000) + public void testTaskTimeout() throws InterruptedException { + try (ClusterManagerService clusterManagerService = createClusterManagerService(true)) { + final AtomicInteger failureCount = new AtomicInteger(); + final AtomicInteger successCount = new AtomicInteger(); + final CountDownLatch taskStartLatch = new CountDownLatch(1); + final CountDownLatch blockingTaskLatch = new CountDownLatch(1); + final CountDownLatch timeoutLatch = new CountDownLatch(1); + final ClusterStateTaskListener blockingListener = new ClusterStateTaskListener() { + @Override + public void onFailure(String source, Exception e) { + fail("Unexpected failure"); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + successCount.incrementAndGet(); + taskStartLatch.countDown(); + try { + blockingTaskLatch.await(); + } catch (InterruptedException e) { + fail("Interrupted"); + } + } + }; + final ClusterStateTaskListener timeoutListener = new ClusterStateTaskListener() { + @Override + public void onFailure(String source, Exception e) { + assertEquals("timeout", source); + failureCount.incrementAndGet(); + timeoutLatch.countDown(); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + fail("Unexpected success"); + } + }; + + final ClusterStateTaskExecutor executor = (currentState, tasks) -> ClusterStateTaskExecutor.ClusterTasksResult.builder() + .successes(tasks) + .build(currentState); + + // start a task and wait for it to start and block on the clusterStateProcessed callback + clusterManagerService.submitStateUpdateTask( + "success", + new Object(), + ClusterStateTaskConfig.build(randomFrom(Priority.values())), + executor, + blockingListener + ); + taskStartLatch.await(); + + // start a second task that is guaranteed to timeout as the first task is still running + clusterManagerService.submitStateUpdateTask( + "timeout", + new Object(), + ClusterStateTaskConfig.build(randomFrom(Priority.values()), TimeValue.timeValueMillis(1L)), + executor, + timeoutListener + ); + + // wait for the timeout to happen, then unblock and assert one success and one failure + timeoutLatch.await(); + blockingTaskLatch.countDown(); + assertEquals(1, failureCount.get()); + assertEquals(1, successCount.get()); + } + } + /** * Returns the cluster state that the cluster-manager service uses (and that is provided by the discovery layer) */