Skip to content

Commit

Permalink
Allow to configure order of completion in SubscribableListener
Browse files Browse the repository at this point in the history
  • Loading branch information
tlrx committed Oct 30, 2023
1 parent 61ff924 commit b0f3e7d
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,13 @@ public SubscribableListener() {
this(EMPTY);
}

/**
* Create a {@link SubscribableListener} which is incomplete.
*/
public SubscribableListener(CompletionOrder completionOrder) {
this(EMPTY, completionOrder);
}

/**
* Create a {@link SubscribableListener} which has already succeeded with the given result.
*/
Expand All @@ -72,7 +79,12 @@ public static <T> SubscribableListener<T> newForked(CheckedConsumer<ActionListen
}

private SubscribableListener(Object initialState) {
state = initialState;
this(initialState, DEFAULT_COMPLETION_ORDER);
}

private SubscribableListener(Object initialState, CompletionOrder completionOrder) {
this.state = initialState;
this.completionOrder = completionOrder;
}

/**
Expand All @@ -89,6 +101,31 @@ private SubscribableListener(Object initialState) {
@SuppressWarnings("FieldMayBeFinal") // updated via VH_STATE_FIELD (and _only_ via VH_STATE_FIELD)
private volatile Object state;

public enum CompletionOrder {
/**
* Listener are completed in which their subscriptions were received
*/
FirstAddedFirstCompleted,

/**
* Listener are completed in the reverse order in which their subscriptions were received
*/
FirstAddedLastCompleted,
}

static final CompletionOrder DEFAULT_COMPLETION_ORDER = CompletionOrder.FirstAddedFirstCompleted;

/**
* Order in which the subscribing listeners will be completed. By default, the completion order is set to
* {@link CompletionOrder#FirstAddedFirstCompleted} and subscribing listeners will be completed in the order in which their
* subscriptions were received. When set to {@link CompletionOrder#FirstAddedLastCompleted} the subscribing listeners will be completed
* in the reverse order in which they subscribed to this listener.
* <p>
* The completion order only affects subscribing listeners added before this listener is completed. There are no guarantees about the
* ordering of the completions of listeners which are added concurrently with (or after) the completion of this listener.
*/
private final CompletionOrder completionOrder;

/**
* Add a listener to this listener's collection of subscribers. If this listener is complete, this method completes the subscribing
* listener immediately with the result with which this listener was completed. Otherwise, the subscribing listener is retained and
Expand All @@ -97,8 +134,8 @@ private SubscribableListener(Object initialState) {
* Subscribed listeners must not throw any exceptions. Use {@link ActionListener#wrap(ActionListener)} if you have a listener for which
* exceptions from its {@link ActionListener#onResponse} method should be handled by its own {@link ActionListener#onFailure} method.
* <p>
* Listeners added strictly before this listener is completed will themselves be completed in the order in which their subscriptions
* were received. However, there are no guarantees about the ordering of the completions of listeners which are added concurrently with
* Listeners added strictly before this listener is completed will themselves be completed in the order defined by the completion order.
* However, there are no guarantees about the ordering of the completions of listeners which are added concurrently with
* (or after) the completion of this listener.
* <p>
* If the subscribed listener is not completed immediately then it will be completed on the thread, and in the {@link ThreadContext}, of
Expand All @@ -116,8 +153,8 @@ public final void addListener(ActionListener<T> listener) {
* Subscribed listeners must not throw any exceptions. Use {@link ActionListener#wrap(ActionListener)} if you have a listener for which
* exceptions from its {@link ActionListener#onResponse} method should be handled by its own {@link ActionListener#onFailure} method.
* <p>
* Listeners added strictly before this listener is completed will themselves be completed in the order in which their subscriptions
* were received. However, there are no guarantees about the ordering of the completions of listeners which are added concurrently with
* Listeners added strictly before this listener is completed will themselves be completed in the order defined by the completion order.
* However, there are no guarantees about the ordering of the completions of listeners which are added concurrently with
* (or after) the completion of this listener.
*
* @param executor If not {@link EsExecutors#DIRECT_EXECUTOR_SERVICE}, and the subscribing listener is not completed immediately,
Expand Down Expand Up @@ -259,18 +296,21 @@ private void setResult(Object result) {
boolean completed = tryComplete(result, listener);
assert completed;
} else if (currentState instanceof Cell currCell) {
// multiple subscribers, but they are currently in reverse order of subscription so reverse them back
Cell prevCell = null;
while (true) {
final Cell nextCell = currCell.next;
currCell.next = prevCell;
if (nextCell == null) {
break;
// multiple subscribers - check the order of completion
if (completionOrder == CompletionOrder.FirstAddedFirstCompleted) {
// multiple subscribers currently in reverse order of subscription so reverse them back
Cell prevCell = null;
while (true) {
final Cell nextCell = currCell.next;
currCell.next = prevCell;
if (nextCell == null) {
break;
}
prevCell = currCell;
currCell = nextCell;
}
prevCell = currCell;
currCell = nextCell;
}
// now they are in subscription order, complete them
// now complete them
while (currCell != null) {
boolean completed = tryComplete(result, (ActionListener<T>) currCell.listener);
assert completed;
Expand Down Expand Up @@ -378,6 +418,9 @@ public <U> SubscribableListener<U> andThen(
@Nullable ThreadContext threadContext,
CheckedBiConsumer<ActionListener<U>, T, ? extends Exception> nextStep
) {
if (completionOrder != CompletionOrder.FirstAddedFirstCompleted) {
throw new IllegalArgumentException("Cannot compose a sequence of operations for this subscribable listener");
}
return newForked(l -> addListener(l.delegateFailureAndWrap(nextStep), executor, threadContext));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.IntFunction;

import static org.elasticsearch.action.support.SubscribableListener.CompletionOrder.FirstAddedFirstCompleted;
import static org.elasticsearch.action.support.SubscribableListener.CompletionOrder.FirstAddedLastCompleted;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;

Expand All @@ -50,7 +52,7 @@ public void run() {
}

public void testSubscriptionOrder() {
var listener = new SubscribableListener<>();
var listener = randomBoolean() ? new SubscribableListener<>() : new SubscribableListener<>(FirstAddedFirstCompleted);
var order = new AtomicInteger();

var subscriberCount = between(0, 4);
Expand All @@ -73,6 +75,30 @@ public void testSubscriptionOrder() {
assertEquals(subscriberCount + 2, order.get());
}

public void testSubscriptionReverseOrder() {
var listener = new SubscribableListener<>(FirstAddedLastCompleted);
var order = new AtomicInteger();

var subscriberCount = between(0, 4);
for (int i = subscriberCount; i > 0; i--) {
listener.addListener(ActionListener.running(new OrderAssertingRunnable(i - 1, order)));
}

assertEquals(0, order.get());

if (randomBoolean()) {
listener.onResponse(new Object());
} else {
listener.onFailure(new ElasticsearchException("test"));
}

assertEquals(subscriberCount, order.get());
listener.addListener(ActionListener.running(new OrderAssertingRunnable(subscriberCount, order)));
assertEquals(subscriberCount + 1, order.get());
listener.addListener(ActionListener.running(new OrderAssertingRunnable(subscriberCount + 1, order)));
assertEquals(subscriberCount + 2, order.get());
}

public void testOnResponse() {
var listener = new SubscribableListener<>();
var order = new AtomicInteger();
Expand Down

0 comments on commit b0f3e7d

Please sign in to comment.