Skip to content

Commit

Permalink
[ML] Adding request queuing for http requests (#100674)
Browse files Browse the repository at this point in the history
* Tests are really slow

* Closing services

* Cleaning up code

* Fixing spotless

* Adding some logging for evictor thread

* Using a custom method for sending requests in the queue

* Adding timeout and rejection logic

* Fixing merge failure

* Revert "Adding timeout and rejection logic"

This reverts commit acc8ba0.

* Removing rethrow

* Reverting node.java changes

---------

Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
  • Loading branch information
jonathan-buttner and elasticmachine authored Oct 16, 2023
1 parent 7df7776 commit e26ad8f
Show file tree
Hide file tree
Showing 18 changed files with 837 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@

import org.elasticsearch.action.ActionListener;

import java.io.Closeable;
import java.util.Map;
import java.util.Set;

public interface InferenceService {
public interface InferenceService extends Closeable {

String name();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,9 @@ public void infer(Model model, String input, Map<String, Object> taskSettings, A
public void start(Model model, ActionListener<Boolean> listener) {
listener.onResponse(true);
}

@Override
public void close() throws IOException {}
}

public static class TestServiceModel extends Model {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.elasticsearch.xpack.inference.action.TransportPutInferenceModelAction;
import org.elasticsearch.xpack.inference.external.http.HttpClientManager;
import org.elasticsearch.xpack.inference.external.http.HttpSettings;
import org.elasticsearch.xpack.inference.external.http.sender.HttpRequestSenderFactory;
import org.elasticsearch.xpack.inference.registry.ModelRegistry;
import org.elasticsearch.xpack.inference.rest.RestDeleteInferenceModelAction;
import org.elasticsearch.xpack.inference.rest.RestGetInferenceModelAction;
Expand All @@ -69,9 +70,10 @@ public class InferencePlugin extends Plugin implements ActionPlugin, InferenceSe

public static final String NAME = "inference";
public static final String UTILITY_THREAD_POOL_NAME = "inference_utility";
public static final String HTTP_CLIENT_SENDER_THREAD_POOL_NAME = "inference_http_client_sender";
private final Settings settings;
private final SetOnce<HttpClientManager> httpClientManager = new SetOnce<>();
private final SetOnce<HttpRequestSenderFactory> httpRequestSenderFactory = new SetOnce<>();
// We'll keep a reference to the http manager just in case the inference services don't get closed individually
private final SetOnce<HttpClientManager> httpManager = new SetOnce<>();

public InferencePlugin(Settings settings) {
this.settings = settings;
Expand Down Expand Up @@ -122,8 +124,8 @@ public Collection<Object> createComponents(
AllocationService allocationService,
IndicesService indicesService
) {
httpClientManager.set(HttpClientManager.create(settings, threadPool, clusterService));

httpManager.set(HttpClientManager.create(settings, threadPool, clusterService));
httpRequestSenderFactory.set(new HttpRequestSenderFactory(threadPool, httpManager.get()));
ModelRegistry modelRegistry = new ModelRegistry(client);
return List.of(modelRegistry);
}
Expand Down Expand Up @@ -165,19 +167,6 @@ public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settingsToUse) {
TimeValue.timeValueMinutes(10),
false,
"xpack.inference.utility_thread_pool"
),
/*
* This executor is specifically for enqueuing requests to be sent. The underlying
* connection pool used by the http client will block if there are no available connections to lease.
* See here for more info: https://hc.apache.org/httpcomponents-client-4.5.x/current/tutorial/html/connmgmt.html
*/
new ScalingExecutorBuilder(
HTTP_CLIENT_SENDER_THREAD_POOL_NAME,
0,
1,
TimeValue.timeValueMinutes(10),
false,
"xpack.inference.http_client_sender_thread_pool"
)
);
}
Expand Down Expand Up @@ -209,8 +198,8 @@ public List<NamedWriteableRegistry.Entry> getInferenceServiceNamedWriteables() {

@Override
public void close() {
if (httpClientManager.get() != null) {
IOUtils.closeWhileHandlingException(httpClientManager.get());
if (httpManager.get() != null) {
IOUtils.closeWhileHandlingException(httpManager.get());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@

import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.concurrent.FutureCallback;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.common.socket.SocketAccess;
Expand All @@ -26,7 +26,6 @@
import java.util.concurrent.atomic.AtomicReference;

import static org.elasticsearch.core.Strings.format;
import static org.elasticsearch.xpack.inference.InferencePlugin.HTTP_CLIENT_SENDER_THREAD_POOL_NAME;
import static org.elasticsearch.xpack.inference.InferencePlugin.UTILITY_THREAD_POOL_NAME;

public class HttpClient implements Closeable {
Expand Down Expand Up @@ -72,21 +71,11 @@ public void start() {
}
}

public void send(HttpUriRequest request, ActionListener<HttpResult> listener) {
public void send(HttpUriRequest request, HttpClientContext context, ActionListener<HttpResult> listener) throws IOException {
// The caller must call start() first before attempting to send a request
assert status.get() == Status.STARTED;
assert status.get() == Status.STARTED : "call start() before attempting to send a request";

threadPool.executor(HTTP_CLIENT_SENDER_THREAD_POOL_NAME).execute(() -> {
try {
doPrivilegedSend(request, listener);
} catch (IOException e) {
listener.onFailure(new ElasticsearchException(format("Failed to send request [%s]", request.getRequestLine()), e));
}
});
}

private void doPrivilegedSend(HttpUriRequest request, ActionListener<HttpResult> listener) throws IOException {
SocketAccess.doPrivileged(() -> client.execute(request, new FutureCallback<>() {
SocketAccess.doPrivileged(() -> client.execute(request, context, new FutureCallback<>() {
@Override
public void completed(HttpResponse response) {
respondUsingUtilityThread(response, request, listener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,16 @@
import java.io.IOException;
import java.util.List;

import static org.elasticsearch.core.Strings.format;

public class HttpClientManager implements Closeable {
private static final Logger logger = LogManager.getLogger(HttpClientManager.class);
/**
* From googling around the connection pools maxTotal value should be close to the number of available threads.
*
* https://stackoverflow.com/questions/30989637/how-to-decide-optimal-settings-for-setmaxtotal-and-setdefaultmaxperroute
*/
static final Setting<Integer> MAX_CONNECTIONS = Setting.intSetting(
public static final Setting<Integer> MAX_CONNECTIONS = Setting.intSetting(
"xpack.inference.http.max_connections",
// TODO pick a reasonable values here
20,
Expand All @@ -42,15 +44,15 @@ public class HttpClientManager implements Closeable {
);

private static final TimeValue DEFAULT_CONNECTION_EVICTION_THREAD_INTERVAL_TIME = TimeValue.timeValueSeconds(10);
static final Setting<TimeValue> CONNECTION_EVICTION_THREAD_INTERVAL_SETTING = Setting.timeSetting(
public static final Setting<TimeValue> CONNECTION_EVICTION_THREAD_INTERVAL_SETTING = Setting.timeSetting(
"xpack.inference.http.connection_eviction_interval",
DEFAULT_CONNECTION_EVICTION_THREAD_INTERVAL_TIME,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

private static final TimeValue DEFAULT_CONNECTION_EVICTION_MAX_IDLE_TIME_SETTING = DEFAULT_CONNECTION_EVICTION_THREAD_INTERVAL_TIME;
static final Setting<TimeValue> CONNECTION_EVICTION_MAX_IDLE_TIME_SETTING = Setting.timeSetting(
public static final Setting<TimeValue> CONNECTION_EVICTION_MAX_IDLE_TIME_SETTING = Setting.timeSetting(
"xpack.inference.http.connection_eviction_max_idle_time",
DEFAULT_CONNECTION_EVICTION_MAX_IDLE_TIME_SETTING,
Setting.Property.NodeScope,
Expand Down Expand Up @@ -128,29 +130,34 @@ public HttpClient getHttpClient() {
@Override
public void close() throws IOException {
httpClient.close();
connectionEvictor.stop();
connectionEvictor.close();
}

private void setMaxConnections(int maxConnections) {
connectionManager.setMaxTotal(maxConnections);
connectionManager.setDefaultMaxPerRoute(maxConnections);
}

// This is only used for testing
boolean isEvictionThreadRunning() {
return connectionEvictor.isRunning();
}

// default for testing
void setEvictionInterval(TimeValue evictionInterval) {
logger.debug(() -> format("Eviction thread's interval time updated to [%s]", evictionInterval));

evictorSettings = new EvictorSettings(evictionInterval, evictorSettings.evictionMaxIdle);

connectionEvictor.stop();
connectionEvictor.close();
connectionEvictor = createConnectionEvictor();
connectionEvictor.start();
}

void setEvictionMaxIdle(TimeValue evictionMaxIdle) {
logger.debug(() -> format("Eviction thread's max idle time updated to [%s]", evictionMaxIdle));
evictorSettings = new EvictorSettings(evictorSettings.evictionInterval, evictionMaxIdle);

connectionEvictor.stop();
connectionEvictor = createConnectionEvictor();
connectionEvictor.start();
connectionEvictor.setMaxIdleTime(evictionMaxIdle);
}

private static class EvictorSettings {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.Closeable;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import static org.elasticsearch.core.Strings.format;
import static org.elasticsearch.xpack.inference.InferencePlugin.UTILITY_THREAD_POOL_NAME;

/**
Expand All @@ -30,13 +32,13 @@
*
* See <a href="https://hc.apache.org/httpcomponents-client-4.5.x/current/tutorial/html/connmgmt.html#d5e418">here for more info.</a>
*/
public class IdleConnectionEvictor {
public class IdleConnectionEvictor implements Closeable {
private static final Logger logger = LogManager.getLogger(IdleConnectionEvictor.class);

private final ThreadPool threadPool;
private final NHttpClientConnectionManager connectionManager;
private final TimeValue sleepTime;
private final TimeValue maxIdleTime;
private final AtomicReference<TimeValue> maxIdleTime = new AtomicReference<>();
private final AtomicReference<Scheduler.Cancellable> cancellableTask = new AtomicReference<>();

public IdleConnectionEvictor(
Expand All @@ -45,10 +47,14 @@ public IdleConnectionEvictor(
TimeValue sleepTime,
TimeValue maxIdleTime
) {
this.threadPool = threadPool;
this.threadPool = Objects.requireNonNull(threadPool);
this.connectionManager = Objects.requireNonNull(connectionManager);
this.sleepTime = sleepTime;
this.maxIdleTime = maxIdleTime;
this.sleepTime = Objects.requireNonNull(sleepTime);
this.maxIdleTime.set(maxIdleTime);
}

public void setMaxIdleTime(TimeValue maxIdleTime) {
this.maxIdleTime.set(maxIdleTime);
}

public synchronized void start() {
Expand All @@ -58,11 +64,13 @@ public synchronized void start() {
}

private void startInternal() {
logger.debug(() -> format("Idle connection evictor started with wait time: [%s] max idle: [%s]", sleepTime, maxIdleTime));

Scheduler.Cancellable task = threadPool.scheduleWithFixedDelay(() -> {
try {
connectionManager.closeExpiredConnections();
if (maxIdleTime != null) {
connectionManager.closeIdleConnections(maxIdleTime.millis(), TimeUnit.MILLISECONDS);
if (maxIdleTime.get() != null) {
connectionManager.closeIdleConnections(maxIdleTime.get().millis(), TimeUnit.MILLISECONDS);
}
} catch (Exception e) {
logger.warn("HTTP connection eviction failed", e);
Expand All @@ -72,8 +80,10 @@ private void startInternal() {
cancellableTask.set(task);
}

public void stop() {
@Override
public void close() {
if (cancellableTask.get() != null) {
logger.debug("Idle connection evictor closing");
cancellableTask.get().cancel();
}
}
Expand Down
Loading

0 comments on commit e26ad8f

Please sign in to comment.