Skip to content

Commit

Permalink
Add RestChunkedToXContentListener version that releases transport res…
Browse files Browse the repository at this point in the history
…ponses (elastic#102607)

This is needed as part of elastic#102030, we need to be able to release
search/multi-search and maybe other transport messages after they've
been serialized as REST responses.
  • Loading branch information
original-brownbear authored Nov 24, 2023
1 parent 441b644 commit f896249
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.elasticsearch.rest.action;

import org.elasticsearch.common.xcontent.ChunkedToXContent;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.rest.ChunkedRestResponseBody;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestResponse;
Expand Down Expand Up @@ -37,10 +38,17 @@ public RestChunkedToXContentListener(RestChannel channel, ToXContent.Params para
@Override
protected void processResponse(Response response) throws IOException {
channel.sendResponse(
RestResponse.chunked(getRestStatus(response), ChunkedRestResponseBody.fromXContent(response, params, channel, null))
RestResponse.chunked(
getRestStatus(response),
ChunkedRestResponseBody.fromXContent(response, params, channel, releasableFromResponse(response))
)
);
}

protected Releasable releasableFromResponse(Response response) {
return null;
}

protected RestStatus getRestStatus(Response response) {
return RestStatus.OK;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.rest.action;

import org.elasticsearch.common.xcontent.ChunkedToXContent;
import org.elasticsearch.core.RefCounted;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.rest.RestChannel;

/**
* Same as {@link RestChunkedToXContentListener} but decrements the ref count on the response it receives by one after serialization of the
* response.
*/
public class RestRefCountedChunkedToXContentListener<Response extends ChunkedToXContent & RefCounted> extends RestChunkedToXContentListener<
Response> {
public RestRefCountedChunkedToXContentListener(RestChannel channel) {
super(channel);
}

@Override
protected Releasable releasableFromResponse(Response response) {
response.mustIncRef();
return Releasables.assertOnce(response::decRef);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.elasticsearch.rest.Scope;
import org.elasticsearch.rest.ServerlessScope;
import org.elasticsearch.rest.action.RestCancellableNodeClient;
import org.elasticsearch.rest.action.RestChunkedToXContentListener;
import org.elasticsearch.rest.action.RestRefCountedChunkedToXContentListener;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.usage.SearchUsageHolder;
import org.elasticsearch.xcontent.XContent;
Expand Down Expand Up @@ -82,7 +82,11 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
);
return channel -> {
final RestCancellableNodeClient cancellableClient = new RestCancellableNodeClient(client, request.getHttpChannel());
cancellableClient.execute(MultiSearchAction.INSTANCE, multiSearchRequest, new RestChunkedToXContentListener<>(channel));
cancellableClient.execute(
MultiSearchAction.INSTANCE,
multiSearchRequest,
new RestRefCountedChunkedToXContentListener<>(channel)
);
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import org.elasticsearch.rest.ServerlessScope;
import org.elasticsearch.rest.action.RestActions;
import org.elasticsearch.rest.action.RestCancellableNodeClient;
import org.elasticsearch.rest.action.RestChunkedToXContentListener;
import org.elasticsearch.rest.action.RestRefCountedChunkedToXContentListener;
import org.elasticsearch.search.Scroll;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.builder.SearchSourceBuilder;
Expand Down Expand Up @@ -121,7 +121,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC

return channel -> {
RestCancellableNodeClient cancelClient = new RestCancellableNodeClient(client, request.getHttpChannel());
cancelClient.execute(SearchAction.INSTANCE, searchRequest, new RestChunkedToXContentListener<>(channel));
cancelClient.execute(SearchAction.INSTANCE, searchRequest, new RestRefCountedChunkedToXContentListener<>(channel));
};
}

Expand Down

0 comments on commit f896249

Please sign in to comment.