From b0f3e7d7e5a748392b9cac8de01dd20f481cb640 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Mon, 30 Oct 2023 11:31:33 +0100 Subject: [PATCH] Allow to configure order of completion in SubscribableListener --- .../action/support/SubscribableListener.java | 73 +++++++++++++++---- .../support/SubscribableListenerTests.java | 28 ++++++- 2 files changed, 85 insertions(+), 16 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/support/SubscribableListener.java b/server/src/main/java/org/elasticsearch/action/support/SubscribableListener.java index cebb4ed6e06e6..99bbaa3e47c46 100644 --- a/server/src/main/java/org/elasticsearch/action/support/SubscribableListener.java +++ b/server/src/main/java/org/elasticsearch/action/support/SubscribableListener.java @@ -47,6 +47,13 @@ public SubscribableListener() { this(EMPTY); } + /** + * Create a {@link SubscribableListener} which is incomplete. + */ + public SubscribableListener(CompletionOrder completionOrder) { + this(EMPTY, completionOrder); + } + /** * Create a {@link SubscribableListener} which has already succeeded with the given result. */ @@ -72,7 +79,12 @@ public static SubscribableListener newForked(CheckedConsumer + * The completion order only affects subscribing listeners added before this listener is completed. There are no guarantees about the + * ordering of the completions of listeners which are added concurrently with (or after) the completion of this listener. + */ + private final CompletionOrder completionOrder; + /** * Add a listener to this listener's collection of subscribers. If this listener is complete, this method completes the subscribing * listener immediately with the result with which this listener was completed. Otherwise, the subscribing listener is retained and @@ -97,8 +134,8 @@ private SubscribableListener(Object initialState) { * Subscribed listeners must not throw any exceptions. Use {@link ActionListener#wrap(ActionListener)} if you have a listener for which * exceptions from its {@link ActionListener#onResponse} method should be handled by its own {@link ActionListener#onFailure} method. *

- * Listeners added strictly before this listener is completed will themselves be completed in the order in which their subscriptions - * were received. However, there are no guarantees about the ordering of the completions of listeners which are added concurrently with + * Listeners added strictly before this listener is completed will themselves be completed in the order defined by the completion order. + * However, there are no guarantees about the ordering of the completions of listeners which are added concurrently with * (or after) the completion of this listener. *

* If the subscribed listener is not completed immediately then it will be completed on the thread, and in the {@link ThreadContext}, of @@ -116,8 +153,8 @@ public final void addListener(ActionListener listener) { * Subscribed listeners must not throw any exceptions. Use {@link ActionListener#wrap(ActionListener)} if you have a listener for which * exceptions from its {@link ActionListener#onResponse} method should be handled by its own {@link ActionListener#onFailure} method. *

- * Listeners added strictly before this listener is completed will themselves be completed in the order in which their subscriptions - * were received. However, there are no guarantees about the ordering of the completions of listeners which are added concurrently with + * Listeners added strictly before this listener is completed will themselves be completed in the order defined by the completion order. + * However, there are no guarantees about the ordering of the completions of listeners which are added concurrently with * (or after) the completion of this listener. * * @param executor If not {@link EsExecutors#DIRECT_EXECUTOR_SERVICE}, and the subscribing listener is not completed immediately, @@ -259,18 +296,21 @@ private void setResult(Object result) { boolean completed = tryComplete(result, listener); assert completed; } else if (currentState instanceof Cell currCell) { - // multiple subscribers, but they are currently in reverse order of subscription so reverse them back - Cell prevCell = null; - while (true) { - final Cell nextCell = currCell.next; - currCell.next = prevCell; - if (nextCell == null) { - break; + // multiple subscribers - check the order of completion + if (completionOrder == CompletionOrder.FirstAddedFirstCompleted) { + // multiple subscribers currently in reverse order of subscription so reverse them back + Cell prevCell = null; + while (true) { + final Cell nextCell = currCell.next; + currCell.next = prevCell; + if (nextCell == null) { + break; + } + prevCell = currCell; + currCell = nextCell; } - prevCell = currCell; - currCell = nextCell; } - // now they are in subscription order, complete them + // now complete them while (currCell != null) { boolean completed = tryComplete(result, (ActionListener) currCell.listener); assert completed; @@ -378,6 +418,9 @@ public SubscribableListener andThen( @Nullable ThreadContext threadContext, CheckedBiConsumer, T, ? extends Exception> nextStep ) { + if (completionOrder != CompletionOrder.FirstAddedFirstCompleted) { + throw new IllegalArgumentException("Cannot compose a sequence of operations for this subscribable listener"); + } return newForked(l -> addListener(l.delegateFailureAndWrap(nextStep), executor, threadContext)); } diff --git a/server/src/test/java/org/elasticsearch/action/support/SubscribableListenerTests.java b/server/src/test/java/org/elasticsearch/action/support/SubscribableListenerTests.java index 2a0391a57eba6..81cbdb388a5f4 100644 --- a/server/src/test/java/org/elasticsearch/action/support/SubscribableListenerTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/SubscribableListenerTests.java @@ -29,6 +29,8 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.IntFunction; +import static org.elasticsearch.action.support.SubscribableListener.CompletionOrder.FirstAddedFirstCompleted; +import static org.elasticsearch.action.support.SubscribableListener.CompletionOrder.FirstAddedLastCompleted; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.instanceOf; @@ -50,7 +52,7 @@ public void run() { } public void testSubscriptionOrder() { - var listener = new SubscribableListener<>(); + var listener = randomBoolean() ? new SubscribableListener<>() : new SubscribableListener<>(FirstAddedFirstCompleted); var order = new AtomicInteger(); var subscriberCount = between(0, 4); @@ -73,6 +75,30 @@ public void testSubscriptionOrder() { assertEquals(subscriberCount + 2, order.get()); } + public void testSubscriptionReverseOrder() { + var listener = new SubscribableListener<>(FirstAddedLastCompleted); + var order = new AtomicInteger(); + + var subscriberCount = between(0, 4); + for (int i = subscriberCount; i > 0; i--) { + listener.addListener(ActionListener.running(new OrderAssertingRunnable(i - 1, order))); + } + + assertEquals(0, order.get()); + + if (randomBoolean()) { + listener.onResponse(new Object()); + } else { + listener.onFailure(new ElasticsearchException("test")); + } + + assertEquals(subscriberCount, order.get()); + listener.addListener(ActionListener.running(new OrderAssertingRunnable(subscriberCount, order))); + assertEquals(subscriberCount + 1, order.get()); + listener.addListener(ActionListener.running(new OrderAssertingRunnable(subscriberCount + 1, order))); + assertEquals(subscriberCount + 2, order.get()); + } + public void testOnResponse() { var listener = new SubscribableListener<>(); var order = new AtomicInteger();