diff --git a/server/src/main/java/org/elasticsearch/action/support/ChannelActionListener.java b/server/src/main/java/org/elasticsearch/action/support/ChannelActionListener.java index c746bc9acf2a1..a4836ca322035 100644 --- a/server/src/main/java/org/elasticsearch/action/support/ChannelActionListener.java +++ b/server/src/main/java/org/elasticsearch/action/support/ChannelActionListener.java @@ -28,7 +28,6 @@ public ChannelActionListener(TransportChannel channel) { @Override public void onResponse(Response response) { - response.incRef(); // acquire reference that will be released by channel.sendResponse below ActionListener.run(this, l -> l.channel.sendResponse(response)); } diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index 4bd7f23c559e0..32347047813a3 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -41,9 +41,7 @@ import org.elasticsearch.common.util.CollectionUtils; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.EsExecutors; -import org.elasticsearch.core.CheckedConsumer; import org.elasticsearch.core.IOUtils; -import org.elasticsearch.core.RefCounted; import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Releasables; import org.elasticsearch.core.TimeValue; @@ -546,7 +544,7 @@ public void executeQueryPhase(ShardSearchRequest request, SearchShardTask task, })); } - private void ensureAfterSeqNoRefreshed( + private void ensureAfterSeqNoRefreshed( IndexShard shard, ShardSearchRequest request, CheckedSupplier executable, @@ -650,27 +648,8 @@ private IndexShard getShard(ShardSearchRequest request) { return indicesService.indexServiceSafe(request.shardId().getIndex()).getShard(request.shardId().id()); } - private static void runAsync( - Executor executor, - CheckedSupplier executable, - ActionListener listener - ) { - executor.execute(ActionRunnable.wrap(listener, new CheckedConsumer<>() { - @Override - public void accept(ActionListener l) throws Exception { - var res = executable.get(); - try { - l.onResponse(res); - } finally { - res.decRef(); - } - } - - @Override - public String toString() { - return executable.toString(); - } - })); + private static void runAsync(Executor executor, CheckedSupplier executable, ActionListener listener) { + executor.execute(ActionRunnable.supply(listener, executable)); } /** @@ -707,7 +686,6 @@ private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchSh final RescoreDocIds rescoreDocIds = context.rescoreDocIds(); context.queryResult().setRescoreDocIds(rescoreDocIds); readerContext.setRescoreDocIds(rescoreDocIds); - // inc-ref query result because we close the SearchContext that references it in this try-with-resources block context.queryResult().incRef(); return context.queryResult(); } @@ -805,7 +783,6 @@ public void executeQueryPhase(QuerySearchRequest request, SearchShardTask task, final RescoreDocIds rescoreDocIds = searchContext.rescoreDocIds(); queryResult.setRescoreDocIds(rescoreDocIds); readerContext.setRescoreDocIds(rescoreDocIds); - // inc-ref query result because we close the SearchContext that references it in this try-with-resources block queryResult.incRef(); return queryResult; } catch (Exception e) { @@ -889,7 +866,6 @@ public void executeFetchPhase(ShardFetchRequest request, SearchShardTask task, A executor.success(); } var fetchResult = searchContext.fetchResult(); - // inc-ref fetch result because we close the SearchContext that references it in this try-with-resources block fetchResult.incRef(); return fetchResult; } catch (Exception e) { diff --git a/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java b/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java index 6bd80e088e66f..becc9b9e53233 100644 --- a/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java +++ b/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java @@ -378,10 +378,7 @@ public void onFailure(Exception e) { null ), new SearchShardTask(123L, "", "", "", null, Collections.emptyMap()), - result.delegateFailure((l, r) -> { - r.incRef(); - l.onResponse(r); - }) + result ); final SearchPhaseResult searchPhaseResult = result.get(); try { @@ -394,7 +391,7 @@ public void onFailure(Exception e) { ); PlainActionFuture listener = new PlainActionFuture<>(); service.executeFetchPhase(req, new SearchShardTask(123L, "", "", "", null, Collections.emptyMap()), listener); - listener.get(); + listener.get().decRef(); if (useScroll) { // have to free context since this test does not remove the index from IndicesService. service.freeReaderContext(searchPhaseResult.getContextId()); @@ -1051,6 +1048,7 @@ public void onResponse(SearchPhaseResult searchPhaseResult) { // make sure that the wrapper is called when the query is actually executed assertEquals(6, numWrapInvocations.get()); } finally { + searchPhaseResult.decRef(); latch.countDown(); } } @@ -1350,6 +1348,7 @@ public void onResponse(SearchPhaseResult result) { assertNotNull(result.queryResult().topDocs()); assertNotNull(result.queryResult().aggregations()); } finally { + result.decRef(); latch.countDown(); } } @@ -1380,6 +1379,7 @@ public void onResponse(SearchPhaseResult result) { assertNotNull(result.queryResult().topDocs()); assertNotNull(result.queryResult().aggregations()); } finally { + result.decRef(); latch.countDown(); } } @@ -1408,6 +1408,7 @@ public void onResponse(SearchPhaseResult result) { assertThat(result, instanceOf(QuerySearchResult.class)); assertTrue(result.queryResult().isNull()); } finally { + result.decRef(); latch.countDown(); } } @@ -1548,6 +1549,7 @@ public void testCancelQueryPhaseEarly() throws Exception { @Override public void onResponse(SearchPhaseResult searchPhaseResult) { service.freeReaderContext(searchPhaseResult.getContextId()); + searchPhaseResult.decRef(); latch1.countDown(); } @@ -1689,7 +1691,7 @@ public void onFailure(Exception e) { client().clearScroll(clearScrollRequest); } - public void testWaitOnRefresh() throws ExecutionException, InterruptedException { + public void testWaitOnRefresh() { createIndex("index"); final SearchService service = getInstanceFromNode(SearchService.class); final IndicesService indicesService = getInstanceFromNode(IndicesService.class); @@ -1703,6 +1705,7 @@ public void testWaitOnRefresh() throws ExecutionException, InterruptedException assertEquals(RestStatus.CREATED, response.status()); SearchShardTask task = new SearchShardTask(123L, "", "", "", null, Collections.emptyMap()); + PlainActionFuture future = new PlainActionFuture<>(); ShardSearchRequest request = new ShardSearchRequest( OriginalIndices.NONE, searchRequest, @@ -1716,12 +1719,13 @@ public void testWaitOnRefresh() throws ExecutionException, InterruptedException null, null ); - PlainActionFuture future = new PlainActionFuture<>(); - service.executeQueryPhase(request, task, future.delegateFailure((l, r) -> { - assertEquals(1, r.queryResult().getTotalHits().value); - l.onResponse(null); - })); - future.get(); + service.executeQueryPhase(request, task, future); + SearchPhaseResult searchPhaseResult = future.actionGet(); + try { + assertEquals(1, searchPhaseResult.queryResult().getTotalHits().value); + } finally { + searchPhaseResult.decRef(); + } } public void testWaitOnRefreshFailsWithRefreshesDisabled() { @@ -1885,6 +1889,7 @@ public void testDfsQueryPhaseRewrite() { -1, null ); + PlainActionFuture plainActionFuture = new PlainActionFuture<>(); final Engine.SearcherSupplier reader = indexShard.acquireSearcherSupplier(); ReaderContext context = service.createAndPutReaderContext( request, @@ -1893,14 +1898,13 @@ public void testDfsQueryPhaseRewrite() { reader, SearchService.KEEPALIVE_INTERVAL_SETTING.get(Settings.EMPTY).millis() ); - PlainActionFuture plainActionFuture = new PlainActionFuture<>(); service.executeQueryPhase( new QuerySearchRequest(null, context.id(), request, new AggregatedDfs(Map.of(), Map.of(), 10)), new SearchShardTask(42L, "", "", "", null, Collections.emptyMap()), plainActionFuture ); - plainActionFuture.actionGet(); + plainActionFuture.actionGet().decRef(); assertThat(((TestRewriteCounterQueryBuilder) request.source().query()).asyncRewriteCount, equalTo(1)); final ShardSearchContextId contextId = context.id(); assertTrue(service.freeReaderContext(contextId)); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/GetCcrRestoreFileChunkAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/GetCcrRestoreFileChunkAction.java index 7aab281f4f7ed..93cd6bd32dbab 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/GetCcrRestoreFileChunkAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/GetCcrRestoreFileChunkAction.java @@ -85,12 +85,7 @@ protected void doExecute( try (CcrRestoreSourceService.SessionReader sessionReader = restoreSourceService.getSessionReader(sessionUUID)) { long offsetAfterRead = sessionReader.readFileBytes(fileName, reference); long offsetBeforeRead = offsetAfterRead - reference.length(); - var chunk = new GetCcrRestoreFileChunkResponse(offsetBeforeRead, reference); - try { - listener.onResponse(chunk); - } finally { - chunk.decRef(); - } + listener.onResponse(new GetCcrRestoreFileChunkResponse(offsetBeforeRead, reference)); } } catch (IOException e) { listener.onFailure(e); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/repositories/GetCcrRestoreFileChunkActionTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/repositories/GetCcrRestoreFileChunkActionTests.java index 61866dbf2029f..629b98a25ba4b 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/repositories/GetCcrRestoreFileChunkActionTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/repositories/GetCcrRestoreFileChunkActionTests.java @@ -126,7 +126,7 @@ public void testRequestedShardIdMustBeConsistentWithSessionShardId() { final PlainActionFuture future1 = new PlainActionFuture<>(); action.doExecute(mock(Task.class), request1, future1); // The actual response content does not matter as long as the future executes without any error - future1.actionGet(); + future1.actionGet().decRef(); // 2. Inconsistent requested ShardId final var request2 = new GetCcrRestoreFileChunkRequest( diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeResponse.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeResponse.java index 3509b41b2f4c4..5904c03a01e44 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeResponse.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeResponse.java @@ -20,7 +20,7 @@ import java.util.Objects; public final class ExchangeResponse extends TransportResponse implements Releasable { - private final RefCounted counted = AbstractRefCounted.of(this::closeInternal); + private final RefCounted counted = AbstractRefCounted.of(this::close); private final Page page; private final boolean finished; private boolean pageTaken; @@ -98,10 +98,6 @@ public boolean hasReferences() { @Override public void close() { - counted.decRef(); - } - - private void closeInternal() { if (pageTaken == false && page != null) { page.releaseBlocks(); } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSinkHandler.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSinkHandler.java index a3af32dc55b5c..9f921c40851d4 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSinkHandler.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSinkHandler.java @@ -138,10 +138,8 @@ private void notifyListeners() { } finally { promised.release(); } - try (response) { - onChanged(); - listener.onResponse(response); - } + onChanged(); + listener.onResponse(response); } } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeServiceTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeServiceTests.java index f44131c006b94..1b98d69c313ca 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeServiceTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeServiceTests.java @@ -412,6 +412,10 @@ public void sendResponse(TransportResponse transportResponse) throws IOException } ExchangeResponse newResp = new ExchangeResponse(page, origResp.finished()); origResp.decRef(); + while (origResp.hasReferences()) { + newResp.incRef(); + origResp.decRef(); + } super.sendResponse(newResp); } };