Skip to content

Commit

Permalink
Adding timeout and rejection logic
Browse files Browse the repository at this point in the history
  • Loading branch information
jonathan-buttner committed Oct 12, 2023
1 parent 0cb27e0 commit acc8ba0
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.inference.external.http.HttpClient;
import org.elasticsearch.xpack.inference.external.http.HttpResult;

Expand Down Expand Up @@ -54,17 +56,25 @@ class HttpRequestExecutorService extends AbstractExecutorService {
private final CountDownLatch terminationLatch = new CountDownLatch(1);
private final HttpClientContext httpContext;
private final HttpClient httpClient;
private final ThreadPool threadPool;

@SuppressForbidden(reason = "properly rethrowing errors, see EsExecutors.rethrowErrors")
HttpRequestExecutorService(ThreadContext contextHolder, String serviceName, HttpClient httpClient) {
this(contextHolder, serviceName, httpClient, null);
HttpRequestExecutorService(ThreadContext contextHolder, String serviceName, HttpClient httpClient, ThreadPool threadPool) {
this(contextHolder, serviceName, httpClient, threadPool, null);
}

@SuppressForbidden(reason = "properly rethrowing errors, see EsExecutors.rethrowErrors")
HttpRequestExecutorService(ThreadContext contextHolder, String serviceName, HttpClient httpClient, @Nullable Integer capacity) {
HttpRequestExecutorService(
ThreadContext contextHolder,
String serviceName,
HttpClient httpClient,
ThreadPool threadPool,
@Nullable Integer capacity
) {
this.contextHolder = Objects.requireNonNull(contextHolder);
this.serviceName = Objects.requireNonNull(serviceName);
this.httpClient = Objects.requireNonNull(httpClient);
this.threadPool = Objects.requireNonNull(threadPool);
this.httpContext = HttpClientContext.create();

if (capacity == null) {
Expand All @@ -91,6 +101,7 @@ public void start() {
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
notifyRequestsOfShutdown();
terminationLatch.countDown();
}
}
Expand All @@ -104,6 +115,31 @@ private void executeTask(HttpTask task) {
EsExecutors.rethrowErrors(ThreadContext.unwrap(task));
}

private void notifyRequestsOfShutdown() {
try {
List<HttpTask> notExecuted = new ArrayList<>();
queue.drainTo(notExecuted);

for (HttpTask task : notExecuted) {
rejectTask(task);
}
} catch (Exception e) {
logger.warn(format("Failed to notify tasks of queuing service [%s] shutdown", serviceName));
}
}

private void rejectTask(HttpTask task) {
try {
task.onRejection(
new EsRejectedExecutionException(format("Failed to send request, queue service [%s] has shutdown", serviceName), true)
);
} catch (Exception e) {
logger.warn(
format("Failed to notify request [%s] for service [%s] of rejection after queuing service shutdown", task, serviceName)
);
}
}

public int queueSize() {
return queue.size();
}
Expand Down Expand Up @@ -140,10 +176,13 @@ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedE
/**
* Send the request at some point in the future.
* @param request the http request to send
* @param timeout the maximum time to wait for this request to complete (failing or successfully). Once the time elapses, the
* listener::onFailure is called with a {@link java.util.concurrent.TimeoutException}. If null, then the request will
* wait forever
* @param listener an {@link ActionListener<HttpResult>} for the response or failure
*/
public void send(HttpRequestBase request, ActionListener<HttpResult> listener) {
RequestTask task = new RequestTask(request, httpClient, httpContext, listener);
public void send(HttpRequestBase request, @Nullable TimeValue timeout, ActionListener<HttpResult> listener) {
RequestTask task = new RequestTask(request, httpClient, httpContext, timeout, threadPool, listener);

if (isShutdown()) {
EsRejectedExecutionException rejected = new EsRejectedExecutionException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,46 +13,115 @@
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.inference.external.http.HttpClient;
import org.elasticsearch.xpack.inference.external.http.HttpResult;

import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.elasticsearch.core.Strings.format;
import static org.elasticsearch.xpack.inference.InferencePlugin.UTILITY_THREAD_POOL_NAME;

class RequestTask extends HttpTask {
private static final Logger logger = LogManager.getLogger(RequestTask.class);
private static final Scheduler.Cancellable NOOP_TIMEOUT_HANDLER = createDefaultHandler();

private final HttpUriRequest request;
private final ActionListener<HttpResult> listener;
private final HttpClient httpClient;
private final HttpClientContext context;
private final Scheduler.Cancellable timeoutHandler;
private final AtomicBoolean notified = new AtomicBoolean();
private final TimeValue timeout;

RequestTask(HttpUriRequest request, HttpClient httpClient, HttpClientContext context, ActionListener<HttpResult> listener) {
RequestTask(
HttpUriRequest request,
HttpClient httpClient,
HttpClientContext context,
@Nullable TimeValue timeout,
ThreadPool threadPool,
ActionListener<HttpResult> listener
) {
this.request = Objects.requireNonNull(request);
this.httpClient = Objects.requireNonNull(httpClient);
this.listener = Objects.requireNonNull(listener);
this.context = Objects.requireNonNull(context);
this.timeout = timeout;
this.timeoutHandler = startTimer(threadPool, timeout);
}

private Scheduler.Cancellable startTimer(ThreadPool threadPool, TimeValue timeout) {
Objects.requireNonNull(threadPool);

if (timeout == null) {
return NOOP_TIMEOUT_HANDLER;
}

return threadPool.schedule(this::onTimeout, timeout, threadPool.executor(UTILITY_THREAD_POOL_NAME));
}

private void onTimeout() {
assert timeout != null : "timeout must be defined to use a timeout handler";
logger.debug(() -> format("Request [%s] timed out after [%s] while waiting to be executed", request.getRequestLine(), timeout));
notifyOfResult(
() -> listener.onFailure(new TimeoutException(format("Request timed out waiting to be executed after [%s]", timeout)))
);
}

private void notifyOfResult(Runnable runnable) {
if (notified.compareAndSet(false, true)) {
runnable.run();
return;
}

logger.debug(() -> format("Attempting to notify of result after already doing so for request [%s]", request.getRequestLine()));
}

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
timeoutHandler.cancel();
notifyOfResult(() -> listener.onFailure(e));
}

@Override
protected void doRun() throws Exception {
ActionListener<HttpResult> notificationListener = ActionListener.wrap(this::onSuccess, this::onFailure);

try {
httpClient.send(request, context, listener);
httpClient.send(request, context, notificationListener);
} catch (IOException e) {
logger.error(format("Failed to send request [%s] via the http client", request.getRequestLine()), e);
listener.onFailure(new ElasticsearchException(format("Failed to send request [%s]", request.getRequestLine()), e));
}
}

private void onSuccess(HttpResult result) {
timeoutHandler.cancel();
notifyOfResult(() -> listener.onResponse(result));
}

@Override
public String toString() {
return request.getRequestLine().toString();
}

private static Scheduler.Cancellable createDefaultHandler() {
return new Scheduler.Cancellable() {
@Override
public boolean cancel() {
return true;
}

@Override
public boolean isCancelled() {
return true;
}
};
}
}

0 comments on commit acc8ba0

Please sign in to comment.