Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor model manager inference api #17

Merged
merged 1 commit into from
Oct 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,26 @@

import ai.djl.ModelException;
import ai.djl.modality.Input;
import ai.djl.modality.Output;
import ai.djl.ndarray.BytesSupplier;
import ai.djl.repository.zoo.ModelNotFoundException;
import ai.djl.serving.models.ModelManager;
import ai.djl.serving.util.ConfigManager;
import ai.djl.serving.util.NettyUtils;
import ai.djl.serving.wlm.Job;
import ai.djl.serving.wlm.ModelInfo;
import ai.djl.serving.wlm.util.WlmCapacityException;
import ai.djl.serving.wlm.util.WlmShutdownException;
import ai.djl.translate.TranslateException;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.QueryStringDecoder;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
import org.slf4j.Logger;
Expand Down Expand Up @@ -174,7 +183,7 @@ private void predict(
ConfigManager.getInstance().getMaxBatchDelay(),
ConfigManager.getInstance().getMaxIdleTime())
.thenApply(m -> modelManager.triggerModelUpdated(m.scaleWorkers(1, -1)))
.thenAccept(m -> modelManager.runJob(ctx, new Job(m, input)));
.thenAccept(m -> runJob(modelManager, ctx, new Job(m, input)));
return;
}

Expand All @@ -183,6 +192,70 @@ private void predict(
return;
}

modelManager.runJob(ctx, new Job(model, input));
runJob(modelManager, ctx, new Job(model, input));
}

void runJob(ModelManager modelManager, ChannelHandlerContext ctx, Job job) {
modelManager
.runJob(job)
.whenComplete(
(o, t) -> {
if (o != null) {
sendOutput(o, ctx);
}
})
.exceptionally(
t -> {
onException(t.getCause(), ctx);
return null;
});
}

void sendOutput(Output output, ChannelHandlerContext ctx) {
FullHttpResponse resp =
new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, false);
for (Map.Entry<String, String> entry : output.getProperties().entrySet()) {
resp.headers().set(entry.getKey(), entry.getValue());
}
BytesSupplier data = output.getData();
if (data != null) {
resp.content().writeBytes(data.getAsBytes());
}

/*
* We can load the models based on the configuration file.Since this Job is
* not driven by the external connections, we could have a empty context for
* this job. We shouldn't try to send a response to ctx if this is not triggered
* by external clients.
*/
if (ctx != null) {
NettyUtils.sendHttpResponse(ctx, resp, true);
}
}

void onException(Throwable t, ChannelHandlerContext ctx) {
HttpResponseStatus status;
if (t instanceof TranslateException) {
status = HttpResponseStatus.BAD_REQUEST;
} else if (t instanceof WlmShutdownException) {
logger.info(t.getMessage());
status = HttpResponseStatus.SERVICE_UNAVAILABLE;
} else if (t instanceof WlmCapacityException) {
logger.warn(t.getMessage());
status = HttpResponseStatus.SERVICE_UNAVAILABLE;
} else {
logger.warn("Unexpected error", t);
status = HttpResponseStatus.INTERNAL_SERVER_ERROR;
}

/*
* We can load the models based on the configuration file.Since this Job is
* not driven by the external connections, we could have a empty context for
* this job. We shouldn't try to send a response to ctx if this is not triggered
* by external clients.
*/
if (ctx != null) {
NettyUtils.sendError(ctx, status, t);
}
}
}
81 changes: 3 additions & 78 deletions serving/src/main/java/ai/djl/serving/models/ModelManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,16 @@
import ai.djl.ModelException;
import ai.djl.modality.Input;
import ai.djl.modality.Output;
import ai.djl.ndarray.BytesSupplier;
import ai.djl.repository.zoo.Criteria;
import ai.djl.repository.zoo.ModelNotFoundException;
import ai.djl.repository.zoo.ZooModel;
import ai.djl.serving.http.BadRequestException;
import ai.djl.serving.http.DescribeModelResponse;
import ai.djl.serving.util.ConfigManager;
import ai.djl.serving.util.NettyUtils;
import ai.djl.serving.wlm.Job;
import ai.djl.serving.wlm.ModelInfo;
import ai.djl.serving.wlm.WorkLoadManager;
import ai.djl.serving.wlm.WorkerThread;
import ai.djl.serving.wlm.util.WlmCapacityException;
import ai.djl.serving.wlm.util.WlmShutdownException;
import ai.djl.translate.TranslateException;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -252,28 +242,11 @@ public Set<String> getStartupModels() {
/**
* Runs an inference job by assigning the job to the next free worker.
*
* @param ctx the netty channel handler context where the job response will be sent
* @param job an inference job to be executed
* @return {@code true} if submit success, false otherwise.
*/
public void runJob(ChannelHandlerContext ctx, Job job) {
wlm.runJob(job)
.whenComplete(
(o, t) -> {
if (t != null) {
onException(t, ctx);
} else {
sendOutput(o, ctx);
}
logger.trace(
"Waiting time: {}, Backend time: {}",
job.getScheduled() - job.getBegin(),
System.currentTimeMillis() - job.getScheduled());
})
.exceptionally(
t -> {
onException(t, ctx);
return null;
});
public CompletableFuture<Output> runJob(Job job) {
return wlm.runJob(job);
}

/**
Expand Down Expand Up @@ -345,52 +318,4 @@ public CompletableFuture<String> workerStatus() {
return response;
});
}

void sendOutput(Output output, ChannelHandlerContext ctx) {
FullHttpResponse resp =
new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, false);
for (Map.Entry<String, String> entry : output.getProperties().entrySet()) {
resp.headers().set(entry.getKey(), entry.getValue());
}
BytesSupplier data = output.getData();
if (data != null) {
resp.content().writeBytes(data.getAsBytes());
}

/*
* We can load the models based on the configuration file.Since this Job is
* not driven by the external connections, we could have a empty context for
* this job. We shouldn't try to send a response to ctx if this is not triggered
* by external clients.
*/
if (ctx != null) {
NettyUtils.sendHttpResponse(ctx, resp, true);
}
}

void onException(Throwable t, ChannelHandlerContext ctx) {
HttpResponseStatus status;
if (t instanceof TranslateException) {
status = HttpResponseStatus.BAD_REQUEST;
} else if (t instanceof WlmShutdownException) {
status = HttpResponseStatus.SERVICE_UNAVAILABLE;
logger.error("Unable to process prediction. Worker shutdown");
} else if (t instanceof WlmCapacityException) {
logger.error("Unable to process prediction. Worker capacity exceeded");
status = HttpResponseStatus.SERVICE_UNAVAILABLE;
} else {
logger.warn("Unexpected error", t);
status = HttpResponseStatus.INTERNAL_SERVER_ERROR;
}

/*
* We can load the models based on the configuration file.Since this Job is
* not driven by the external connections, we could have a empty context for
* this job. We shouldn't try to send a response to ctx if this is not triggered
* by external clients.
*/
if (ctx != null) {
NettyUtils.sendError(ctx, status, t);
}
}
}
3 changes: 1 addition & 2 deletions serving/src/test/java/ai/djl/serving/ModelServerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -733,8 +733,7 @@ private void testServiceUnavailable() throws InterruptedException {
if (!System.getProperty("os.name").startsWith("Win")) {
ErrorResponse resp = JsonUtils.GSON.fromJson(result, ErrorResponse.class);
Assert.assertEquals(resp.getCode(), HttpResponseStatus.SERVICE_UNAVAILABLE.code());
Assert.assertEquals(
resp.getMessage(), "No worker is available to serve request: mlp_2");
Assert.assertEquals(resp.getMessage(), "All model workers has been shutdown: mlp_2");
}
}

Expand Down
7 changes: 2 additions & 5 deletions wlm/src/main/java/ai/djl/serving/wlm/WorkLoadManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,20 +79,17 @@ public CompletableFuture<Output> runJob(Job job) {
ModelInfo modelInfo = job.getModel();
int maxWorkers = modelInfo.getMaxWorkers();
if (maxWorkers == 0) {
logger.info("All model workers has been shutdown: {}", modelInfo.getModelName());
result.completeExceptionally(
new WlmShutdownException(
"No worker is available to serve request: "
+ modelInfo.getModelName()));
"All model workers has been shutdown: " + modelInfo.getModelName()));
return result;
}
WorkerPool pool = getWorkerPoolForModel(modelInfo);
LinkedBlockingDeque<WorkerJob> queue = pool.getJobQueue();
if (!queue.offer(new WorkerJob(job, result))) {
logger.warn("Worker queue capacity exceeded for model: {}", modelInfo.getModelName());
result.completeExceptionally(
new WlmCapacityException(
"No worker is available to serve request: "
"Worker queue capacity exceeded for model: "
+ modelInfo.getModelName()));
return result;
}
Expand Down