Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable global checkpoint listeners to timeout #33620

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.SuppressForbidden;

import java.util.concurrent.ExecutionException;
Expand All @@ -30,8 +31,14 @@

public class FutureUtils {

/**
* Cancel execution of this future without interrupting a running thread. See {@link Future#cancel(boolean)} for details.
*
* @param toCancel the future to cancel
* @return false if the future could not be cancelled, otherwise true
*/
@SuppressForbidden(reason = "Future#cancel()")
public static boolean cancel(Future<?> toCancel) {
public static boolean cancel(@Nullable final Future<?> toCancel) {
if (toCancel != null) {
return toCancel.cancel(false); // this method is a forbidden API since it interrupts threads
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,19 @@

import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.FutureUtils;

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
Expand All @@ -45,51 +51,59 @@ public class GlobalCheckpointListeners implements Closeable {
public interface GlobalCheckpointListener {
/**
* Callback when the global checkpoint is updated or the shard is closed. If the shard is closed, the value of the global checkpoint
* will be set to {@link org.elasticsearch.index.seqno.SequenceNumbers#UNASSIGNED_SEQ_NO} and the exception will be non-null. If the
* global checkpoint is updated, the exception will be null.
* will be set to {@link org.elasticsearch.index.seqno.SequenceNumbers#UNASSIGNED_SEQ_NO} and the exception will be non-null and an
* instance of {@link IndexShardClosedException }. If the listener timed out waiting for notification then the exception will be
* non-null and an instance of {@link TimeoutException}. If the global checkpoint is updated, the exception will be null.
*
* @param globalCheckpoint the updated global checkpoint
* @param e if non-null, the shard is closed
* @param e if non-null, the shard is closed or the listener timed out
*/
void accept(long globalCheckpoint, IndexShardClosedException e);
void accept(long globalCheckpoint, Exception e);
}

// guarded by this
private boolean closed;
private volatile List<GlobalCheckpointListener> listeners;
private volatile Map<GlobalCheckpointListener, ScheduledFuture<?>> listeners;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

afaics, all accesses to listeners are under lock, but I may miss something.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, good catch @dnhatn. In an early version of #32696 this was not the case but during review we changed it so that it is the case, and then missed that volatile is no longer necessary here. I will remove this in a follow-up.

private long lastKnownGlobalCheckpoint = UNASSIGNED_SEQ_NO;

private final ShardId shardId;
private final Executor executor;
private final ScheduledExecutorService scheduler;
private final Logger logger;

/**
* Construct a global checkpoint listeners collection.
*
* @param shardId the shard ID on which global checkpoint updates can be listened to
* @param executor the executor for listener notifications
* @param logger a shard-level logger
* @param shardId the shard ID on which global checkpoint updates can be listened to
* @param executor the executor for listener notifications
* @param scheduler the executor used for scheduling timeouts
* @param logger a shard-level logger
*/
GlobalCheckpointListeners(
final ShardId shardId,
final Executor executor,
final ScheduledExecutorService scheduler,
final Logger logger) {
this.shardId = Objects.requireNonNull(shardId);
this.executor = Objects.requireNonNull(executor);
this.logger = Objects.requireNonNull(logger);
this.shardId = Objects.requireNonNull(shardId, "shardId");
this.executor = Objects.requireNonNull(executor, "executor");
this.scheduler = Objects.requireNonNull(scheduler, "scheduler");
this.logger = Objects.requireNonNull(logger, "logger");
}

/**
* Add a global checkpoint listener. If the global checkpoint is above the current global checkpoint known to the listener then the
* listener will be asynchronously notified on the executor used to construct this collection of global checkpoint listeners. If the
* shard is closed then the listener will be asynchronously notified on the executor used to construct this collection of global
* checkpoint listeners. The listener will only be notified of at most one event, either the global checkpoint is updated or the shard
* is closed. A listener must re-register after one of these events to receive subsequent events.
* is closed. A listener must re-register after one of these events to receive subsequent events. Callers may add a timeout to be
* notified after if the timeout elapses. In this case, the listener will be notified with a {@link TimeoutException}. Passing null for
* the timeout means no timeout will be associated to the listener.
*
* @param currentGlobalCheckpoint the current global checkpoint known to the listener
* @param listener the listener
* @param timeout the listener timeout, or null if no timeout
*/
synchronized void add(final long currentGlobalCheckpoint, final GlobalCheckpointListener listener) {
synchronized void add(final long currentGlobalCheckpoint, final GlobalCheckpointListener listener, final TimeValue timeout) {
if (closed) {
executor.execute(() -> notifyListener(listener, UNASSIGNED_SEQ_NO, new IndexShardClosedException(shardId)));
return;
Expand All @@ -99,9 +113,43 @@ synchronized void add(final long currentGlobalCheckpoint, final GlobalCheckpoint
executor.execute(() -> notifyListener(listener, lastKnownGlobalCheckpoint, null));
} else {
if (listeners == null) {
listeners = new ArrayList<>();
listeners = new LinkedHashMap<>();
}
if (timeout == null) {
listeners.put(listener, null);
} else {
listeners.put(
listener,
scheduler.schedule(
() -> {
final boolean removed;
synchronized (this) {
if (listeners == null) {
jasontedor marked this conversation as resolved.
Show resolved Hide resolved
/*
* This can happen if a notification nulled out the listeners, and then our scheduled execution
* occurred before we could be cancelled by the notification. In this case, we would have
* blocked waiting for access to this critical section.
*/
removed = false;
} else {
/*
* Removed can be false here if a notification nulled out the listeners, and then our scheduled
* execution occurred before we could be cancelled by the notfication, and then another thread
* added a listener causing listeners to be non-null again.
*/
removed = listeners.containsKey(listener);
listeners.remove(listener);
}
}
if (removed) {
final TimeoutException e = new TimeoutException(timeout.getStringRep());
logger.trace("global checkpoint listener timed out", e);
executor.execute(() -> notifyListener(listener, UNASSIGNED_SEQ_NO, e));
}
},
timeout.nanos(),
TimeUnit.NANOSECONDS));
}
listeners.add(listener);
}
}

Expand All @@ -111,10 +159,25 @@ public synchronized void close() throws IOException {
notifyListeners(UNASSIGNED_SEQ_NO, new IndexShardClosedException(shardId));
}

/**
* The number of listeners currently pending for notification.
*
* @return the number of listeners pending notification
*/
synchronized int pendingListeners() {
return listeners == null ? 0 : listeners.size();
}

/**
* The scheduled future for a listener that has a timeout associated with it, otherwise null.
*
* @param listener the listener to get the scheduled future for
* @return a scheduled future representing the timeout future for the listener, otherwise null
*/
synchronized ScheduledFuture<?> timeout(final GlobalCheckpointListener listener) {
jasontedor marked this conversation as resolved.
Show resolved Hide resolved
return listeners.get(listener);
}

/**
* Invoke to notify all registered listeners of an updated global checkpoint.
*
Expand All @@ -134,19 +197,24 @@ private void notifyListeners(final long globalCheckpoint, final IndexShardClosed
assert (globalCheckpoint == UNASSIGNED_SEQ_NO && e != null) || (globalCheckpoint >= NO_OPS_PERFORMED && e == null);
if (listeners != null) {
// capture the current listeners
final List<GlobalCheckpointListener> currentListeners = listeners;
final Map<GlobalCheckpointListener, ScheduledFuture<?>> currentListeners = listeners;
listeners = null;
if (currentListeners != null) {
executor.execute(() -> {
for (final GlobalCheckpointListener listener : currentListeners) {
notifyListener(listener, globalCheckpoint, e);
for (final Map.Entry<GlobalCheckpointListener, ScheduledFuture<?>> listener : currentListeners.entrySet()) {
/*
* We do not want to interrupt any timeouts that fired, these will detect that the listener has been notified and
* not trigger the timeout.
*/
FutureUtils.cancel(listener.getValue());
notifyListener(listener.getKey(), globalCheckpoint, e);
}
});
}
}
}

private void notifyListener(final GlobalCheckpointListener listener, final long globalCheckpoint, final IndexShardClosedException e) {
private void notifyListener(final GlobalCheckpointListener listener, final long globalCheckpoint, final Exception e) {
try {
listener.accept(globalCheckpoint, e);
} catch (final Exception caught) {
Expand All @@ -156,8 +224,11 @@ private void notifyListener(final GlobalCheckpointListener listener, final long
"error notifying global checkpoint listener of updated global checkpoint [{}]",
globalCheckpoint),
caught);
} else {
} else if (e instanceof IndexShardClosedException) {
logger.warn("error notifying global checkpoint listener of closed shard", caught);
} else {
assert e instanceof TimeoutException : e;
logger.warn("error notifying global checkpoint listener of timeout", caught);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: why not just always log the exception (to serve as an indicator of what happened) and assert its either IndexShardCloseException or TimeoutException?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer it as since it's more explicit, and since your approach only saves two lines of code I'll stick with mine. 😇

}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,8 @@ public IndexShard(
this.checkIndexOnStartup = indexSettings.getValue(IndexSettings.INDEX_CHECK_ON_STARTUP);
this.translogConfig = new TranslogConfig(shardId, shardPath().resolveTranslog(), indexSettings, bigArrays);
final String aId = shardRouting.allocationId().getId();
this.globalCheckpointListeners = new GlobalCheckpointListeners(shardId, threadPool.executor(ThreadPool.Names.LISTENER), logger);
this.globalCheckpointListeners =
new GlobalCheckpointListeners(shardId, threadPool.executor(ThreadPool.Names.LISTENER), threadPool.scheduler(), logger);
this.replicationTracker =
new ReplicationTracker(shardId, aId, indexSettings, UNASSIGNED_SEQ_NO, globalCheckpointListeners::globalCheckpointUpdated);

Expand Down Expand Up @@ -1781,15 +1782,18 @@ public void updateGlobalCheckpointForShard(final String allocationId, final long

/**
* Add a global checkpoint listener. If the global checkpoint is above the current global checkpoint known to the listener then the
* listener will fire immediately on the calling thread.
* listener will fire immediately on the calling thread. If the specified timeout elapses before the listener is notified, the listener
* will be notified with an {@link TimeoutException}. A caller may pass null to specify no timeout.
*
* @param currentGlobalCheckpoint the current global checkpoint known to the listener
* @param listener the listener
* @param timeout the timeout
*/
public void addGlobalCheckpointListener(
final long currentGlobalCheckpoint,
final GlobalCheckpointListeners.GlobalCheckpointListener listener) {
this.globalCheckpointListeners.add(currentGlobalCheckpoint, listener);
final GlobalCheckpointListeners.GlobalCheckpointListener listener,
final TimeValue timeout) {
this.globalCheckpointListeners.add(currentGlobalCheckpoint, listener, timeout);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.common.util.concurrent;

import org.elasticsearch.test.ESTestCase;

import java.util.concurrent.Future;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;

public class FutureUtilsTests extends ESTestCase {

public void testCancellingNullFutureOkay() {
FutureUtils.cancel(null);
}

public void testRunningFutureNotInterrupted() {
final Future<?> future = mock(Future.class);
FutureUtils.cancel(future);
verify(future).cancel(false);
}

}
Loading