Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Only notify ready global checkpoint listeners #33690

Merged
merged 7 commits into from
Sep 14, 2018
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@

import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.FutureUtils;

import java.io.Closeable;
import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
Expand All @@ -34,6 +36,7 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;

import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
Expand Down Expand Up @@ -63,7 +66,7 @@ public interface GlobalCheckpointListener {

// guarded by this
private boolean closed;
private Map<GlobalCheckpointListener, ScheduledFuture<?>> listeners;
private final Map<GlobalCheckpointListener, Tuple<Long, ScheduledFuture<?>>> listeners = new LinkedHashMap<>();
private long lastKnownGlobalCheckpoint = UNASSIGNED_SEQ_NO;

private final ShardId shardId;
Expand Down Expand Up @@ -91,62 +94,56 @@ public interface GlobalCheckpointListener {
}

/**
* Add a global checkpoint listener. If the global checkpoint is above the current global checkpoint known to the listener then the
* listener will be asynchronously notified on the executor used to construct this collection of global checkpoint listeners. If the
* shard is closed then the listener will be asynchronously notified on the executor used to construct this collection of global
* checkpoint listeners. The listener will only be notified of at most one event, either the global checkpoint is updated or the shard
* is closed. A listener must re-register after one of these events to receive subsequent events. Callers may add a timeout to be
* notified after if the timeout elapses. In this case, the listener will be notified with a {@link TimeoutException}. Passing null for
* the timeout means no timeout will be associated to the listener.
* Add a global checkpoint listener. If the global checkpoint is equal to or above the global checkpoint the listener is waiting for,
* then the listener will be asynchronously notified on the executor used to construct this collection of global checkpoint listeners.
* If the shard is closed then the listener will be asynchronously notified on the executor used to construct this collection of global
* checkpoint listeners. The listener will only be notified of at most one event, either the global checkpoint is updated above the
* global checkpoint the listener is waiting for, or the shard is closed. A listener must re-register after one of these events to
* receive subsequent events. Callers may add a timeout to be notified after if the timeout elapses. In this case, the listener will be
* notified with a {@link TimeoutException}. Passing null fo the timeout means no timeout will be associated to the listener.
*
* @param currentGlobalCheckpoint the current global checkpoint known to the listener
* @param listener the listener
* @param timeout the listener timeout, or null if no timeout
* @param waitingForGlobalCheckpoint the current global checkpoint known to the listener
* @param listener the listener
* @param timeout the listener timeout, or null if no timeout
*/
synchronized void add(final long currentGlobalCheckpoint, final GlobalCheckpointListener listener, final TimeValue timeout) {
synchronized void add(final long waitingForGlobalCheckpoint, final GlobalCheckpointListener listener, final TimeValue timeout) {
if (closed) {
executor.execute(() -> notifyListener(listener, UNASSIGNED_SEQ_NO, new IndexShardClosedException(shardId)));
return;
}
if (lastKnownGlobalCheckpoint > currentGlobalCheckpoint) {
if (lastKnownGlobalCheckpoint >= waitingForGlobalCheckpoint) {
// notify directly
executor.execute(() -> notifyListener(listener, lastKnownGlobalCheckpoint, null));
} else {
if (listeners == null) {
listeners = new LinkedHashMap<>();
}
if (timeout == null) {
listeners.put(listener, null);
listeners.put(listener, Tuple.tuple(waitingForGlobalCheckpoint, null));
} else {
listeners.put(
listener,
scheduler.schedule(
() -> {
final boolean removed;
synchronized (this) {
/*
* Note that the listeners map can be null if a notification nulled out the map reference when
* notifying listeners, and then our scheduled execution occurred before we could be cancelled by
* the notification. In this case, we would have blocked waiting for access to this critical
* section.
*
* What is more, we know that this listener has a timeout associated with it (otherwise we would
* not be here) so the return value from remove being null is an indication that we are not in the
* map. This can happen if a notification nulled out the listeners, and then our scheduled execution
* occurred before we could be cancelled by the notification, and then another thread added a
* listener causing the listeners map reference to be non-null again. In this case, our listener
* here would not be in the map and we should not fire the timeout logic.
*/
removed = listeners != null && listeners.remove(listener) != null;
}
if (removed) {
final TimeoutException e = new TimeoutException(timeout.getStringRep());
logger.trace("global checkpoint listener timed out", e);
executor.execute(() -> notifyListener(listener, UNASSIGNED_SEQ_NO, e));
}
},
timeout.nanos(),
TimeUnit.NANOSECONDS));
Tuple.tuple(
waitingForGlobalCheckpoint,
scheduler.schedule(
() -> {
final boolean removed;
synchronized (this) {
/*
* We know that this listener has a timeout associated with it (otherwise we would not be
* here) so the future component of the return value from remove being null is an indication
* that we are not in the map. This can happen if a notification collected us into listeners
* to be notified and removed us from the map, and then our scheduled execution occurred
* before we could be cancelled by the notification. In this case, our listener here would
* not be in the map and we should not fire the timeout logic.
*/
removed = listeners.remove(listener).v2() != null;
}
if (removed) {
final TimeoutException e = new TimeoutException(timeout.getStringRep());
logger.trace("global checkpoint listener timed out", e);
executor.execute(() -> notifyListener(listener, UNASSIGNED_SEQ_NO, e));
}
},
timeout.nanos(),
TimeUnit.NANOSECONDS)));
}
}
}
Expand All @@ -163,7 +160,7 @@ public synchronized void close() throws IOException {
* @return the number of listeners pending notification
*/
synchronized int pendingListeners() {
return listeners == null ? 0 : listeners.size();
return listeners.size();
}

/**
Expand All @@ -173,7 +170,7 @@ synchronized int pendingListeners() {
* @return a scheduled future representing the timeout future for the listener, otherwise null
*/
synchronized ScheduledFuture<?> getTimeoutFuture(final GlobalCheckpointListener listener) {
return listeners.get(listener);
return listeners.get(listener).v2();
}

/**
Expand All @@ -193,22 +190,31 @@ synchronized void globalCheckpointUpdated(final long globalCheckpoint) {
private void notifyListeners(final long globalCheckpoint, final IndexShardClosedException e) {
assert Thread.holdsLock(this);
assert (globalCheckpoint == UNASSIGNED_SEQ_NO && e != null) || (globalCheckpoint >= NO_OPS_PERFORMED && e == null);
jasontedor marked this conversation as resolved.
Show resolved Hide resolved
if (listeners != null) {
// capture the current listeners
final Map<GlobalCheckpointListener, ScheduledFuture<?>> currentListeners = listeners;
listeners = null;
if (currentListeners != null) {
executor.execute(() -> {
for (final Map.Entry<GlobalCheckpointListener, ScheduledFuture<?>> listener : currentListeners.entrySet()) {
/*
* We do not want to interrupt any timeouts that fired, these will detect that the listener has been notified and
* not trigger the timeout.
*/
FutureUtils.cancel(listener.getValue());
notifyListener(listener.getKey(), globalCheckpoint, e);
}
});
}

final Map<GlobalCheckpointListener, Tuple<Long, ScheduledFuture<?>>> listenersToNotify;
if (globalCheckpoint != UNASSIGNED_SEQ_NO) {
listenersToNotify =
listeners
.entrySet()
.stream()
.filter(entry -> entry.getValue().v1() <= globalCheckpoint)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
listenersToNotify.keySet().forEach(listeners::remove);
} else {
listenersToNotify = new HashMap<>(listeners);
listeners.clear();
}
if (listenersToNotify.isEmpty() == false) {
executor.execute(() ->
listenersToNotify
.forEach((listener, t) -> {
/*
* We do not want to interrupt any timeouts that fired, these will detect that the listener has been
* notified and not trigger the timeout.
*/
FutureUtils.cancel(t.v2());
notifyListener(listener, globalCheckpoint, e);
}));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1781,19 +1781,20 @@ public void updateGlobalCheckpointForShard(final String allocationId, final long
}

/**
* Add a global checkpoint listener. If the global checkpoint is above the current global checkpoint known to the listener then the
* listener will fire immediately on the calling thread. If the specified timeout elapses before the listener is notified, the listener
* will be notified with an {@link TimeoutException}. A caller may pass null to specify no timeout.
* Add a global checkpoint listener. If the global checkpoint is equal to or above the global checkpoint the listener is waiting for,
* then the listener will be notified immediately via an executor (so possibly not on the current thread). If the specified timeout
* elapses before the listener is notified, the listener will be notified with an {@link TimeoutException}. A caller may pass null to
* specify no timeout.
*
* @param currentGlobalCheckpoint the current global checkpoint known to the listener
* @param listener the listener
* @param timeout the timeout
* @param waitingForGlobalCheckpoint the global checkpoint the listener is waiting for
* @param listener the listener
* @param timeout the timeout
*/
public void addGlobalCheckpointListener(
final long currentGlobalCheckpoint,
final long waitingForGlobalCheckpoint,
final GlobalCheckpointListeners.GlobalCheckpointListener listener,
final TimeValue timeout) {
this.globalCheckpointListeners.add(currentGlobalCheckpoint, listener, timeout);
this.globalCheckpointListeners.add(waitingForGlobalCheckpoint, listener, timeout);
}

/**
Expand Down
Loading