diff --git a/test/external-modules/latency-simulating-directory/src/internalClusterTest/java/org/elasticsearch/test/simulatedlatencyrepo/LatencySimulatingBlobStoreRepositoryTests.java b/test/external-modules/latency-simulating-directory/src/internalClusterTest/java/org/elasticsearch/test/simulatedlatencyrepo/LatencySimulatingBlobStoreRepositoryTests.java index 77e151ed22f32..6d49971df4f0d 100644 --- a/test/external-modules/latency-simulating-directory/src/internalClusterTest/java/org/elasticsearch/test/simulatedlatencyrepo/LatencySimulatingBlobStoreRepositoryTests.java +++ b/test/external-modules/latency-simulating-directory/src/internalClusterTest/java/org/elasticsearch/test/simulatedlatencyrepo/LatencySimulatingBlobStoreRepositoryTests.java @@ -36,6 +36,7 @@ import java.util.concurrent.atomic.LongAdder; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; @@ -136,9 +137,9 @@ public void testRetrieveSnapshots() throws Exception { assertThat(restoreSnapshotResponse.getRestoreInfo().failedShards(), equalTo(0)); logger.info("--> run a search"); - var searchResponse = client.prepareSearch("test-idx").setQuery(QueryBuilders.termQuery("text", "sometext")).get(); - - assertThat(searchResponse.getHits().getTotalHits().value, greaterThan(0L)); - assertThat(COUNTS.intValue(), greaterThan(0)); + assertResponse(client.prepareSearch("test-idx").setQuery(QueryBuilders.termQuery("text", "sometext")), searchResponse -> { + assertThat(searchResponse.getHits().getTotalHits().value, greaterThan(0L)); + assertThat(COUNTS.intValue(), greaterThan(0)); + }); } } diff --git a/x-pack/plugin/async-search/qa/security/src/javaRestTest/java/org/elasticsearch/xpack/search/AsyncSearchSecurityIT.java b/x-pack/plugin/async-search/qa/security/src/javaRestTest/java/org/elasticsearch/xpack/search/AsyncSearchSecurityIT.java index 21a2c2295c809..1819ad7960006 100644 --- a/x-pack/plugin/async-search/qa/security/src/javaRestTest/java/org/elasticsearch/xpack/search/AsyncSearchSecurityIT.java +++ b/x-pack/plugin/async-search/qa/security/src/javaRestTest/java/org/elasticsearch/xpack/search/AsyncSearchSecurityIT.java @@ -8,6 +8,7 @@ package org.elasticsearch.xpack.search; import org.apache.http.util.EntityUtils; +import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.Request; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.Response; @@ -182,15 +183,19 @@ private void testCase(String user, String other) throws Exception { private SearchHit[] getSearchHits(String asyncId, String user) throws IOException { final Response resp = getAsyncSearch(asyncId, user); assertOK(resp); - AsyncSearchResponse searchResponse = AsyncSearchResponse.fromXContent( + SearchResponse searchResponse = AsyncSearchResponse.fromXContent( XContentHelper.createParser( NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, new BytesArray(EntityUtils.toByteArray(resp.getEntity())), XContentType.JSON ) - ); - return searchResponse.getSearchResponse().getHits().getHits(); + ).getSearchResponse(); + try { + return searchResponse.getHits().getHits(); + } finally { + searchResponse.decRef(); + } } public void testAuthorizationOfPointInTime() throws Exception { diff --git a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupIndexerIndexingTests.java b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupIndexerIndexingTests.java index 6d7b1d943f10a..1e6a4794b14ae 100644 --- a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupIndexerIndexingTests.java +++ b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupIndexerIndexingTests.java @@ -865,23 +865,25 @@ protected void doNextSearch(long waitTimeInNanos, ActionListener } catch (IOException e) { listener.onFailure(e); } - SearchResponse response = new SearchResponse( - null, - new Aggregations(Collections.singletonList(result)), - null, - false, - null, - null, - 1, - null, - 1, - 1, - 0, - 0, - ShardSearchFailure.EMPTY_ARRAY, - null + ActionListener.respondAndRelease( + listener, + new SearchResponse( + null, + new Aggregations(Collections.singletonList(result)), + null, + false, + null, + null, + 1, + null, + 1, + 1, + 0, + 0, + ShardSearchFailure.EMPTY_ARRAY, + null + ) ); - listener.onResponse(response); } @Override diff --git a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupIndexerStateTests.java b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupIndexerStateTests.java index f858544e4dd2b..75577ba458bae 100644 --- a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupIndexerStateTests.java +++ b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupIndexerStateTests.java @@ -105,23 +105,25 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws return null; } })); - final SearchResponse response = new SearchResponse( - new SearchHits(new SearchHit[0], new TotalHits(0, TotalHits.Relation.EQUAL_TO), 0), - aggs, - null, - false, - null, - null, - 1, - null, - 1, - 1, - 0, - 0, - new ShardSearchFailure[0], - null + ActionListener.respondAndRelease( + nextPhase, + new SearchResponse( + new SearchHits(new SearchHit[0], new TotalHits(0, TotalHits.Relation.EQUAL_TO), 0), + aggs, + null, + false, + null, + null, + 1, + null, + 1, + 1, + 0, + 0, + new ShardSearchFailure[0], + null + ) ); - nextPhase.onResponse(response); } @Override diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/TimeBasedCheckpointProviderTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/TimeBasedCheckpointProviderTests.java index 0a1179e4224aa..8504bbb6c1e05 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/TimeBasedCheckpointProviderTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/TimeBasedCheckpointProviderTests.java @@ -178,36 +178,41 @@ private void testSourceHasChanged( TimeValue delay, Tuple expectedRangeQueryBounds ) throws InterruptedException { - doAnswer(withResponse(newSearchResponse(totalHits))).when(client).execute(eq(TransportSearchAction.TYPE), any(), any()); - String transformId = getTestName(); - TransformConfig transformConfig = newTransformConfigWithDateHistogram( - transformId, - transformVersion, - dateHistogramField, - dateHistogramInterval, - delay - ); - TimeBasedCheckpointProvider provider = newCheckpointProvider(transformConfig); + final SearchResponse searchResponse = newSearchResponse(totalHits); + try { + doAnswer(withResponse(searchResponse)).when(client).execute(eq(TransportSearchAction.TYPE), any(), any()); + String transformId = getTestName(); + TransformConfig transformConfig = newTransformConfigWithDateHistogram( + transformId, + transformVersion, + dateHistogramField, + dateHistogramInterval, + delay + ); + TimeBasedCheckpointProvider provider = newCheckpointProvider(transformConfig); - SetOnce hasChangedHolder = new SetOnce<>(); - SetOnce exceptionHolder = new SetOnce<>(); - CountDownLatch latch = new CountDownLatch(1); - provider.sourceHasChanged( - lastCheckpoint, - new LatchedActionListener<>(ActionListener.wrap(hasChangedHolder::set, exceptionHolder::set), latch) - ); - assertThat(latch.await(100, TimeUnit.MILLISECONDS), is(true)); + SetOnce hasChangedHolder = new SetOnce<>(); + SetOnce exceptionHolder = new SetOnce<>(); + CountDownLatch latch = new CountDownLatch(1); + provider.sourceHasChanged( + lastCheckpoint, + new LatchedActionListener<>(ActionListener.wrap(hasChangedHolder::set, exceptionHolder::set), latch) + ); + assertThat(latch.await(100, TimeUnit.MILLISECONDS), is(true)); - ArgumentCaptor searchRequestArgumentCaptor = ArgumentCaptor.forClass(SearchRequest.class); - verify(client).execute(eq(TransportSearchAction.TYPE), searchRequestArgumentCaptor.capture(), any()); - SearchRequest searchRequest = searchRequestArgumentCaptor.getValue(); - BoolQueryBuilder boolQuery = (BoolQueryBuilder) searchRequest.source().query(); - RangeQueryBuilder rangeQuery = (RangeQueryBuilder) boolQuery.filter().get(1); - assertThat(rangeQuery.from(), is(equalTo(expectedRangeQueryBounds.v1()))); - assertThat(rangeQuery.to(), is(equalTo(expectedRangeQueryBounds.v2()))); + ArgumentCaptor searchRequestArgumentCaptor = ArgumentCaptor.forClass(SearchRequest.class); + verify(client).execute(eq(TransportSearchAction.TYPE), searchRequestArgumentCaptor.capture(), any()); + SearchRequest searchRequest = searchRequestArgumentCaptor.getValue(); + BoolQueryBuilder boolQuery = (BoolQueryBuilder) searchRequest.source().query(); + RangeQueryBuilder rangeQuery = (RangeQueryBuilder) boolQuery.filter().get(1); + assertThat(rangeQuery.from(), is(equalTo(expectedRangeQueryBounds.v1()))); + assertThat(rangeQuery.to(), is(equalTo(expectedRangeQueryBounds.v2()))); - assertThat(hasChangedHolder.get(), is(equalTo(expectedHasChangedValue))); - assertThat(exceptionHolder.get(), is(nullValue())); + assertThat(hasChangedHolder.get(), is(equalTo(expectedHasChangedValue))); + assertThat(exceptionHolder.get(), is(nullValue())); + } finally { + searchResponse.decRef(); + } } public void testCreateNextCheckpoint_NoDelay() throws InterruptedException { diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java index b1c9edc0fab0a..8ee7e902285c9 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java @@ -541,26 +541,28 @@ protected void && "the_pit_id+++".equals(searchRequest.pointInTimeBuilder().getEncodedId())) { listener.onFailure(new SearchContextMissingException(new ShardSearchContextId("sc_missing", 42))); } else { - SearchResponse response = new SearchResponse( - new SearchHits(new SearchHit[] { new SearchHit(1) }, new TotalHits(1L, TotalHits.Relation.EQUAL_TO), 1.0f), - // Simulate completely null aggs - null, - new Suggest(Collections.emptyList()), - false, - false, - new SearchProfileResults(Collections.emptyMap()), - 1, - null, - 1, - 1, - 0, - 0, - ShardSearchFailure.EMPTY_ARRAY, - SearchResponse.Clusters.EMPTY, - // copy the pit from the request - searchRequest.pointInTimeBuilder() != null ? searchRequest.pointInTimeBuilder().getEncodedId() + "+" : null + ActionListener.respondAndRelease( + listener, + (Response) new SearchResponse( + new SearchHits(new SearchHit[] { new SearchHit(1) }, new TotalHits(1L, TotalHits.Relation.EQUAL_TO), 1.0f), + // Simulate completely null aggs + null, + new Suggest(Collections.emptyList()), + false, + false, + new SearchProfileResults(Collections.emptyMap()), + 1, + null, + 1, + 1, + 0, + 0, + ShardSearchFailure.EMPTY_ARRAY, + SearchResponse.Clusters.EMPTY, + // copy the pit from the request + searchRequest.pointInTimeBuilder() != null ? searchRequest.pointInTimeBuilder().getEncodedId() + "+" : null + ) ); - listener.onResponse((Response) response); } return; diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureHandlingTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureHandlingTests.java index 5c6539d0a5045..4489f114d1d58 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureHandlingTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureHandlingTests.java @@ -221,7 +221,8 @@ protected void onAbort() { @Override void doGetInitialProgress(SearchRequest request, ActionListener responseListener) { - responseListener.onResponse( + ActionListener.respondAndRelease( + responseListener, new SearchResponse( new SearchHits(new SearchHit[0], new TotalHits(0L, TotalHits.Relation.EQUAL_TO), 0.0f), // Simulate completely null aggs @@ -388,29 +389,33 @@ public void testDoProcessAggNullCheck() { ShardSearchFailure.EMPTY_ARRAY, SearchResponse.Clusters.EMPTY ); - AtomicReference state = new AtomicReference<>(IndexerState.STOPPED); - Function searchFunction = searchRequest -> searchResponse; - Function bulkFunction = bulkRequest -> new BulkResponse(new BulkItemResponse[0], 100); - - TransformAuditor auditor = mock(TransformAuditor.class); - TransformContext context = new TransformContext(TransformTaskState.STARTED, "", 0, mock(TransformContext.Listener.class)); - - MockedTransformIndexer indexer = createMockIndexer( - config, - state, - searchFunction, - bulkFunction, - null, - threadPool, - ThreadPool.Names.GENERIC, - auditor, - context - ); + try { + AtomicReference state = new AtomicReference<>(IndexerState.STOPPED); + Function searchFunction = searchRequest -> searchResponse; + Function bulkFunction = bulkRequest -> new BulkResponse(new BulkItemResponse[0], 100); + + TransformAuditor auditor = mock(TransformAuditor.class); + TransformContext context = new TransformContext(TransformTaskState.STARTED, "", 0, mock(TransformContext.Listener.class)); + + MockedTransformIndexer indexer = createMockIndexer( + config, + state, + searchFunction, + bulkFunction, + null, + threadPool, + ThreadPool.Names.GENERIC, + auditor, + context + ); - IterationResult newPosition = indexer.doProcess(searchResponse); - assertThat(newPosition.getToIndex().collect(Collectors.toList()), is(empty())); - assertThat(newPosition.getPosition(), is(nullValue())); - assertThat(newPosition.isDone(), is(true)); + IterationResult newPosition = indexer.doProcess(searchResponse); + assertThat(newPosition.getToIndex().collect(Collectors.toList()), is(empty())); + assertThat(newPosition.getPosition(), is(nullValue())); + assertThat(newPosition.isDone(), is(true)); + } finally { + searchResponse.decRef(); + } } public void testScriptError() throws Exception { @@ -524,58 +529,61 @@ public void testRetentionPolicyDeleteByQueryThrowsIrrecoverable() throws Excepti ShardSearchFailure.EMPTY_ARRAY, SearchResponse.Clusters.EMPTY ); - - AtomicReference state = new AtomicReference<>(IndexerState.STOPPED); - Function searchFunction = searchRequest -> searchResponse; - - Function bulkFunction = bulkRequest -> new BulkResponse(new BulkItemResponse[0], 100); - - Function deleteByQueryFunction = deleteByQueryRequest -> { - throw new SearchPhaseExecutionException( - "query", - "Partial shards failure", - new ShardSearchFailure[] { - new ShardSearchFailure( - new ElasticsearchParseException("failed to parse date field", new IllegalArgumentException("illegal format")) - ) } + try { + AtomicReference state = new AtomicReference<>(IndexerState.STOPPED); + Function searchFunction = searchRequest -> searchResponse; + + Function bulkFunction = bulkRequest -> new BulkResponse(new BulkItemResponse[0], 100); + + Function deleteByQueryFunction = deleteByQueryRequest -> { + throw new SearchPhaseExecutionException( + "query", + "Partial shards failure", + new ShardSearchFailure[] { + new ShardSearchFailure( + new ElasticsearchParseException("failed to parse date field", new IllegalArgumentException("illegal format")) + ) } + ); + }; + + final AtomicBoolean failIndexerCalled = new AtomicBoolean(false); + final AtomicReference failureMessage = new AtomicReference<>(); + + MockTransformAuditor auditor = MockTransformAuditor.createMockAuditor(); + TransformContext.Listener contextListener = createContextListener(failIndexerCalled, failureMessage); + TransformContext context = new TransformContext(TransformTaskState.STARTED, "", 0, contextListener); + + MockedTransformIndexer indexer = createMockIndexer( + config, + state, + searchFunction, + bulkFunction, + deleteByQueryFunction, + threadPool, + ThreadPool.Names.GENERIC, + auditor, + context ); - }; - final AtomicBoolean failIndexerCalled = new AtomicBoolean(false); - final AtomicReference failureMessage = new AtomicReference<>(); + final CountDownLatch latch = indexer.newLatch(1); - MockTransformAuditor auditor = MockTransformAuditor.createMockAuditor(); - TransformContext.Listener contextListener = createContextListener(failIndexerCalled, failureMessage); - TransformContext context = new TransformContext(TransformTaskState.STARTED, "", 0, contextListener); + indexer.start(); + assertThat(indexer.getState(), equalTo(IndexerState.STARTED)); + assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis())); + assertThat(indexer.getState(), equalTo(IndexerState.INDEXING)); - MockedTransformIndexer indexer = createMockIndexer( - config, - state, - searchFunction, - bulkFunction, - deleteByQueryFunction, - threadPool, - ThreadPool.Names.GENERIC, - auditor, - context - ); - - final CountDownLatch latch = indexer.newLatch(1); - - indexer.start(); - assertThat(indexer.getState(), equalTo(IndexerState.STARTED)); - assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis())); - assertThat(indexer.getState(), equalTo(IndexerState.INDEXING)); - - latch.countDown(); - assertBusy(() -> assertThat(indexer.getState(), equalTo(IndexerState.STARTED)), 10, TimeUnit.SECONDS); - assertTrue(failIndexerCalled.get()); - assertThat( - failureMessage.get(), - matchesRegex( - "task encountered irrecoverable failure: org.elasticsearch.ElasticsearchParseException: failed to parse date field;.*" - ) - ); + latch.countDown(); + assertBusy(() -> assertThat(indexer.getState(), equalTo(IndexerState.STARTED)), 10, TimeUnit.SECONDS); + assertTrue(failIndexerCalled.get()); + assertThat( + failureMessage.get(), + matchesRegex( + "task encountered irrecoverable failure: org.elasticsearch.ElasticsearchParseException: failed to parse date field;.*" + ) + ); + } finally { + searchResponse.decRef(); + } } public void testRetentionPolicyDeleteByQueryThrowsTemporaryProblem() throws Exception { @@ -614,61 +622,64 @@ public void testRetentionPolicyDeleteByQueryThrowsTemporaryProblem() throws Exce ShardSearchFailure.EMPTY_ARRAY, SearchResponse.Clusters.EMPTY ); - - AtomicReference state = new AtomicReference<>(IndexerState.STOPPED); - Function searchFunction = searchRequest -> searchResponse; - - Function bulkFunction = bulkRequest -> new BulkResponse(new BulkItemResponse[0], 100); - - Function deleteByQueryFunction = deleteByQueryRequest -> { - throw new SearchPhaseExecutionException( - "query", - "Partial shards failure", - new ShardSearchFailure[] { new ShardSearchFailure(new ElasticsearchTimeoutException("timed out during dbq")) } + try { + AtomicReference state = new AtomicReference<>(IndexerState.STOPPED); + Function searchFunction = searchRequest -> searchResponse; + + Function bulkFunction = bulkRequest -> new BulkResponse(new BulkItemResponse[0], 100); + + Function deleteByQueryFunction = deleteByQueryRequest -> { + throw new SearchPhaseExecutionException( + "query", + "Partial shards failure", + new ShardSearchFailure[] { new ShardSearchFailure(new ElasticsearchTimeoutException("timed out during dbq")) } + ); + }; + + final AtomicBoolean failIndexerCalled = new AtomicBoolean(false); + final AtomicReference failureMessage = new AtomicReference<>(); + + MockTransformAuditor auditor = MockTransformAuditor.createMockAuditor(); + auditor.addExpectation( + new MockTransformAuditor.SeenAuditExpectation( + "timed out during dbq", + Level.WARNING, + transformId, + "Transform encountered an exception: [org.elasticsearch.ElasticsearchTimeoutException: timed out during dbq];" + + " Will automatically retry [1/10]" + ) + ); + TransformContext.Listener contextListener = createContextListener(failIndexerCalled, failureMessage); + TransformContext context = new TransformContext(TransformTaskState.STARTED, "", 0, contextListener); + + MockedTransformIndexer indexer = createMockIndexer( + config, + state, + searchFunction, + bulkFunction, + deleteByQueryFunction, + threadPool, + ThreadPool.Names.GENERIC, + auditor, + context ); - }; - - final AtomicBoolean failIndexerCalled = new AtomicBoolean(false); - final AtomicReference failureMessage = new AtomicReference<>(); - - MockTransformAuditor auditor = MockTransformAuditor.createMockAuditor(); - auditor.addExpectation( - new MockTransformAuditor.SeenAuditExpectation( - "timed out during dbq", - Level.WARNING, - transformId, - "Transform encountered an exception: [org.elasticsearch.ElasticsearchTimeoutException: timed out during dbq];" - + " Will automatically retry [1/10]" - ) - ); - TransformContext.Listener contextListener = createContextListener(failIndexerCalled, failureMessage); - TransformContext context = new TransformContext(TransformTaskState.STARTED, "", 0, contextListener); - - MockedTransformIndexer indexer = createMockIndexer( - config, - state, - searchFunction, - bulkFunction, - deleteByQueryFunction, - threadPool, - ThreadPool.Names.GENERIC, - auditor, - context - ); - final CountDownLatch latch = indexer.newLatch(1); + final CountDownLatch latch = indexer.newLatch(1); - indexer.start(); - assertThat(indexer.getState(), equalTo(IndexerState.STARTED)); - assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis())); - assertThat(indexer.getState(), equalTo(IndexerState.INDEXING)); + indexer.start(); + assertThat(indexer.getState(), equalTo(IndexerState.STARTED)); + assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis())); + assertThat(indexer.getState(), equalTo(IndexerState.INDEXING)); - latch.countDown(); - assertBusy(() -> assertThat(indexer.getState(), equalTo(IndexerState.STARTED)), 10, TimeUnit.SECONDS); - assertFalse(failIndexerCalled.get()); - assertThat(indexer.getState(), equalTo(IndexerState.STARTED)); - auditor.assertAllExpectationsMatched(); - assertEquals(1, context.getFailureCount()); + latch.countDown(); + assertBusy(() -> assertThat(indexer.getState(), equalTo(IndexerState.STARTED)), 10, TimeUnit.SECONDS); + assertFalse(failIndexerCalled.get()); + assertThat(indexer.getState(), equalTo(IndexerState.STARTED)); + auditor.assertAllExpectationsMatched(); + assertEquals(1, context.getFailureCount()); + } finally { + searchResponse.decRef(); + } } public void testFailureCounterIsResetOnSuccess() throws Exception { @@ -707,72 +718,75 @@ public void testFailureCounterIsResetOnSuccess() throws Exception { ShardSearchFailure.EMPTY_ARRAY, SearchResponse.Clusters.EMPTY ); - - AtomicReference state = new AtomicReference<>(IndexerState.STOPPED); - Function searchFunction = new Function<>() { - final AtomicInteger calls = new AtomicInteger(0); - - @Override - public SearchResponse apply(SearchRequest searchRequest) { - int call = calls.getAndIncrement(); - if (call == 0) { - throw new SearchPhaseExecutionException( - "query", - "Partial shards failure", - new ShardSearchFailure[] { new ShardSearchFailure(new Exception()) } - ); + try { + AtomicReference state = new AtomicReference<>(IndexerState.STOPPED); + Function searchFunction = new Function<>() { + final AtomicInteger calls = new AtomicInteger(0); + + @Override + public SearchResponse apply(SearchRequest searchRequest) { + int call = calls.getAndIncrement(); + if (call == 0) { + throw new SearchPhaseExecutionException( + "query", + "Partial shards failure", + new ShardSearchFailure[] { new ShardSearchFailure(new Exception()) } + ); + } + return searchResponse; } - return searchResponse; - } - }; + }; - Function bulkFunction = request -> new BulkResponse(new BulkItemResponse[0], 1); + Function bulkFunction = request -> new BulkResponse(new BulkItemResponse[0], 1); - final AtomicBoolean failIndexerCalled = new AtomicBoolean(false); - final AtomicReference failureMessage = new AtomicReference<>(); + final AtomicBoolean failIndexerCalled = new AtomicBoolean(false); + final AtomicReference failureMessage = new AtomicReference<>(); - MockTransformAuditor auditor = MockTransformAuditor.createMockAuditor(); - TransformContext.Listener contextListener = createContextListener(failIndexerCalled, failureMessage); - TransformContext context = new TransformContext(TransformTaskState.STARTED, "", 0, contextListener); + MockTransformAuditor auditor = MockTransformAuditor.createMockAuditor(); + TransformContext.Listener contextListener = createContextListener(failIndexerCalled, failureMessage); + TransformContext context = new TransformContext(TransformTaskState.STARTED, "", 0, contextListener); - MockedTransformIndexer indexer = createMockIndexer( - config, - state, - searchFunction, - bulkFunction, - null, - threadPool, - ThreadPool.Names.GENERIC, - auditor, - context - ); + MockedTransformIndexer indexer = createMockIndexer( + config, + state, + searchFunction, + bulkFunction, + null, + threadPool, + ThreadPool.Names.GENERIC, + auditor, + context + ); - final CountDownLatch latch = indexer.newLatch(1); + final CountDownLatch latch = indexer.newLatch(1); - indexer.start(); - assertThat(indexer.getState(), equalTo(IndexerState.STARTED)); - assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis())); - assertThat(indexer.getState(), equalTo(IndexerState.INDEXING)); + indexer.start(); + assertThat(indexer.getState(), equalTo(IndexerState.STARTED)); + assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis())); + assertThat(indexer.getState(), equalTo(IndexerState.INDEXING)); - latch.countDown(); - assertBusy(() -> assertThat(indexer.getState(), equalTo(IndexerState.STARTED)), 10, TimeUnit.SECONDS); - assertFalse(failIndexerCalled.get()); - assertThat(indexer.getState(), equalTo(IndexerState.STARTED)); - assertEquals(1, context.getFailureCount()); + latch.countDown(); + assertBusy(() -> assertThat(indexer.getState(), equalTo(IndexerState.STARTED)), 10, TimeUnit.SECONDS); + assertFalse(failIndexerCalled.get()); + assertThat(indexer.getState(), equalTo(IndexerState.STARTED)); + assertEquals(1, context.getFailureCount()); - final CountDownLatch secondLatch = indexer.newLatch(1); + final CountDownLatch secondLatch = indexer.newLatch(1); - indexer.start(); - assertThat(indexer.getState(), equalTo(IndexerState.STARTED)); - assertBusy(() -> assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()))); - assertThat(indexer.getState(), equalTo(IndexerState.INDEXING)); + indexer.start(); + assertThat(indexer.getState(), equalTo(IndexerState.STARTED)); + assertBusy(() -> assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()))); + assertThat(indexer.getState(), equalTo(IndexerState.INDEXING)); - secondLatch.countDown(); - assertBusy(() -> assertThat(indexer.getState(), equalTo(IndexerState.STARTED)), 10, TimeUnit.SECONDS); - assertFalse(failIndexerCalled.get()); - assertThat(indexer.getState(), equalTo(IndexerState.STARTED)); - auditor.assertAllExpectationsMatched(); - assertEquals(0, context.getFailureCount()); + secondLatch.countDown(); + assertBusy(() -> assertThat(indexer.getState(), equalTo(IndexerState.STARTED)), 10, TimeUnit.SECONDS); + assertFalse(failIndexerCalled.get()); + assertThat(indexer.getState(), equalTo(IndexerState.STARTED)); + auditor.assertAllExpectationsMatched(); + assertEquals(0, context.getFailureCount()); + } finally { + searchResponse.decRef(); + } } // tests throttling of audits on logs based on repeated exception types