From 514831fe81e72239765c3e131b6beb2c8564a296 Mon Sep 17 00:00:00 2001 From: Movva Ajaykumar Date: Mon, 8 Aug 2022 17:23:40 +0530 Subject: [PATCH] Added Point In Time Node Stats API ServiceLayer Changes (#4030) * Adds Node Stats api changes for the Point in time Signed-off-by: Ajay Kumar Movva --- .../index/search/stats/SearchStats.java | 56 +++++++++++++++++++ .../index/search/stats/ShardSearchStats.java | 17 ++++++ .../index/search/stats/SearchStatsTests.java | 9 ++- .../search/CreatePitSingleNodeTests.java | 39 +++++++++++++ .../search/DeletePitMultiNodeTests.java | 24 ++++++++ .../opensearch/search/PitMultiNodeTests.java | 18 ++++++ .../opensearch/search/SearchServiceTests.java | 17 +++++- 7 files changed, 175 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/search/stats/SearchStats.java b/server/src/main/java/org/opensearch/index/search/stats/SearchStats.java index fe23000902608..38b18355fd98d 100644 --- a/server/src/main/java/org/opensearch/index/search/stats/SearchStats.java +++ b/server/src/main/java/org/opensearch/index/search/stats/SearchStats.java @@ -32,6 +32,7 @@ package org.opensearch.index.search.stats; +import org.opensearch.Version; import org.opensearch.common.Nullable; import org.opensearch.common.Strings; import org.opensearch.common.io.stream.StreamInput; @@ -77,6 +78,10 @@ public static class Stats implements Writeable, ToXContentFragment { private long suggestTimeInMillis; private long suggestCurrent; + private long pitCount; + private long pitTimeInMillis; + private long pitCurrent; + private Stats() { // for internal use, initializes all counts to 0 } @@ -91,6 +96,9 @@ public Stats( long scrollCount, long scrollTimeInMillis, long scrollCurrent, + long pitCount, + long pitTimeInMillis, + long pitCurrent, long suggestCount, long suggestTimeInMillis, long suggestCurrent @@ -110,6 +118,10 @@ public Stats( this.suggestCount = suggestCount; this.suggestTimeInMillis = suggestTimeInMillis; this.suggestCurrent = suggestCurrent; + + this.pitCount = pitCount; + this.pitTimeInMillis = pitTimeInMillis; + this.pitCurrent = pitCurrent; } private Stats(StreamInput in) throws IOException { @@ -128,6 +140,12 @@ private Stats(StreamInput in) throws IOException { suggestCount = in.readVLong(); suggestTimeInMillis = in.readVLong(); suggestCurrent = in.readVLong(); + + if (in.getVersion().onOrAfter(Version.V_3_0_0)) { + pitCount = in.readVLong(); + pitTimeInMillis = in.readVLong(); + pitCurrent = in.readVLong(); + } } public void add(Stats stats) { @@ -146,6 +164,10 @@ public void add(Stats stats) { suggestCount += stats.suggestCount; suggestTimeInMillis += stats.suggestTimeInMillis; suggestCurrent += stats.suggestCurrent; + + pitCount += stats.pitCount; + pitTimeInMillis += stats.pitTimeInMillis; + pitCurrent += stats.pitCurrent; } public void addForClosingShard(Stats stats) { @@ -162,6 +184,10 @@ public void addForClosingShard(Stats stats) { suggestCount += stats.suggestCount; suggestTimeInMillis += stats.suggestTimeInMillis; + + pitCount += stats.pitCount; + pitTimeInMillis += stats.pitTimeInMillis; + pitCurrent += stats.pitCurrent; } public long getQueryCount() { @@ -212,6 +238,22 @@ public long getScrollCurrent() { return scrollCurrent; } + public long getPitCount() { + return pitCount; + } + + public TimeValue getPitTime() { + return new TimeValue(pitTimeInMillis); + } + + public long getPitTimeInMillis() { + return pitTimeInMillis; + } + + public long getPitCurrent() { + return pitCurrent; + } + public long getSuggestCount() { return suggestCount; } @@ -249,6 +291,12 @@ public void writeTo(StreamOutput out) throws IOException { out.writeVLong(suggestCount); out.writeVLong(suggestTimeInMillis); out.writeVLong(suggestCurrent); + + if (out.getVersion().onOrAfter(Version.V_3_0_0)) { + out.writeVLong(pitCount); + out.writeVLong(pitTimeInMillis); + out.writeVLong(pitCurrent); + } } @Override @@ -265,6 +313,10 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.humanReadableField(Fields.SCROLL_TIME_IN_MILLIS, Fields.SCROLL_TIME, getScrollTime()); builder.field(Fields.SCROLL_CURRENT, scrollCurrent); + builder.field(Fields.PIT_TOTAL, pitCount); + builder.humanReadableField(Fields.PIT_TIME_IN_MILLIS, Fields.PIT_TIME, getPitTime()); + builder.field(Fields.PIT_CURRENT, pitCurrent); + builder.field(Fields.SUGGEST_TOTAL, suggestCount); builder.humanReadableField(Fields.SUGGEST_TIME_IN_MILLIS, Fields.SUGGEST_TIME, getSuggestTime()); builder.field(Fields.SUGGEST_CURRENT, suggestCurrent); @@ -385,6 +437,10 @@ static final class Fields { static final String SCROLL_TIME = "scroll_time"; static final String SCROLL_TIME_IN_MILLIS = "scroll_time_in_millis"; static final String SCROLL_CURRENT = "scroll_current"; + static final String PIT_TOTAL = "point_in_time_total"; + static final String PIT_TIME = "point_in_time_time"; + static final String PIT_TIME_IN_MILLIS = "point_in_time_time_in_millis"; + static final String PIT_CURRENT = "point_in_time_current"; static final String SUGGEST_TOTAL = "suggest_total"; static final String SUGGEST_TIME = "suggest_time"; static final String SUGGEST_TIME_IN_MILLIS = "suggest_time_in_millis"; diff --git a/server/src/main/java/org/opensearch/index/search/stats/ShardSearchStats.java b/server/src/main/java/org/opensearch/index/search/stats/ShardSearchStats.java index 3ef3571c75e59..6d0eb3a5949ca 100644 --- a/server/src/main/java/org/opensearch/index/search/stats/ShardSearchStats.java +++ b/server/src/main/java/org/opensearch/index/search/stats/ShardSearchStats.java @@ -187,6 +187,18 @@ public void onFreeScrollContext(ReaderContext readerContext) { totalStats.scrollMetric.inc(TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - readerContext.getStartTimeInNano())); } + @Override + public void onNewPitContext(ReaderContext readerContext) { + totalStats.pitCurrent.inc(); + } + + @Override + public void onFreePitContext(ReaderContext readerContext) { + totalStats.pitCurrent.dec(); + assert totalStats.pitCurrent.count() >= 0; + totalStats.pitMetric.inc(TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - readerContext.getStartTimeInNano())); + } + /** * Holder of statistics values * @@ -203,10 +215,12 @@ static final class StatsHolder { * for one-thousand times as long (i.e., scrolls that execute for almost twelve days on average). */ final MeanMetric scrollMetric = new MeanMetric(); + final MeanMetric pitMetric = new MeanMetric(); final MeanMetric suggestMetric = new MeanMetric(); final CounterMetric queryCurrent = new CounterMetric(); final CounterMetric fetchCurrent = new CounterMetric(); final CounterMetric scrollCurrent = new CounterMetric(); + final CounterMetric pitCurrent = new CounterMetric(); final CounterMetric suggestCurrent = new CounterMetric(); SearchStats.Stats stats() { @@ -220,6 +234,9 @@ SearchStats.Stats stats() { scrollMetric.count(), TimeUnit.MICROSECONDS.toMillis(scrollMetric.sum()), scrollCurrent.count(), + pitMetric.count(), + TimeUnit.MICROSECONDS.toMillis(pitMetric.sum()), + pitCurrent.count(), suggestMetric.count(), TimeUnit.NANOSECONDS.toMillis(suggestMetric.sum()), suggestCurrent.count() diff --git a/server/src/test/java/org/opensearch/index/search/stats/SearchStatsTests.java b/server/src/test/java/org/opensearch/index/search/stats/SearchStatsTests.java index 4682d35411b78..7d2d8e38d066e 100644 --- a/server/src/test/java/org/opensearch/index/search/stats/SearchStatsTests.java +++ b/server/src/test/java/org/opensearch/index/search/stats/SearchStatsTests.java @@ -45,9 +45,9 @@ public void testShardLevelSearchGroupStats() throws Exception { // let's create two dummy search stats with groups Map groupStats1 = new HashMap<>(); Map groupStats2 = new HashMap<>(); - groupStats2.put("group1", new Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1)); - SearchStats searchStats1 = new SearchStats(new Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1), 0, groupStats1); - SearchStats searchStats2 = new SearchStats(new Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1), 0, groupStats2); + groupStats2.put("group1", new Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1)); + SearchStats searchStats1 = new SearchStats(new Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1), 0, groupStats1); + SearchStats searchStats2 = new SearchStats(new Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1), 0, groupStats2); // adding these two search stats and checking group stats are correct searchStats1.add(searchStats2); @@ -75,6 +75,9 @@ private static void assertStats(Stats stats, long equalTo) { assertEquals(equalTo, stats.getScrollCount()); assertEquals(equalTo, stats.getScrollTimeInMillis()); assertEquals(equalTo, stats.getScrollCurrent()); + assertEquals(equalTo, stats.getPitCount()); + assertEquals(equalTo, stats.getPitTimeInMillis()); + assertEquals(equalTo, stats.getPitCurrent()); assertEquals(equalTo, stats.getSuggestCount()); assertEquals(equalTo, stats.getSuggestTimeInMillis()); assertEquals(equalTo, stats.getSuggestCurrent()); diff --git a/server/src/test/java/org/opensearch/search/CreatePitSingleNodeTests.java b/server/src/test/java/org/opensearch/search/CreatePitSingleNodeTests.java index b28423c3a8657..b730dc01c4871 100644 --- a/server/src/test/java/org/opensearch/search/CreatePitSingleNodeTests.java +++ b/server/src/test/java/org/opensearch/search/CreatePitSingleNodeTests.java @@ -24,6 +24,9 @@ import org.opensearch.search.builder.PointInTimeBuilder; import org.opensearch.search.sort.SortOrder; import org.opensearch.test.OpenSearchSingleNodeTestCase; +import org.opensearch.index.IndexService; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.indices.IndicesService; import java.util.Map; import java.util.concurrent.CountDownLatch; @@ -74,7 +77,11 @@ public void testCreatePITSuccess() throws ExecutionException, InterruptedExcepti SearchService service = getInstanceFromNode(SearchService.class); assertEquals(2, service.getActiveContexts()); + validatePitStats("index", 1, 0, 0); + validatePitStats("index", 1, 0, 1); service.doClose(); // this kills the keep-alive reaper we have to reset the node after this test + validatePitStats("index", 0, 1, 0); + validatePitStats("index", 0, 1, 1); } public void testCreatePITWithMultipleIndicesSuccess() throws ExecutionException, InterruptedException { @@ -91,7 +98,12 @@ public void testCreatePITWithMultipleIndicesSuccess() throws ExecutionException, PitTestsUtil.assertUsingGetAllPits(client(), response.getId(), response.getCreationTime()); assertEquals(4, response.getSuccessfulShards()); assertEquals(4, service.getActiveContexts()); + + validatePitStats("index", 1, 0, 0); + validatePitStats("index1", 1, 0, 0); service.doClose(); + validatePitStats("index", 0, 1, 0); + validatePitStats("index1", 0, 1, 0); } public void testCreatePITWithShardReplicasSuccess() throws ExecutionException, InterruptedException { @@ -112,7 +124,11 @@ public void testCreatePITWithShardReplicasSuccess() throws ExecutionException, I SearchService service = getInstanceFromNode(SearchService.class); assertEquals(2, service.getActiveContexts()); + validatePitStats("index", 1, 0, 0); + validatePitStats("index", 1, 0, 1); service.doClose(); + validatePitStats("index", 0, 1, 0); + validatePitStats("index", 0, 1, 1); } public void testCreatePITWithNonExistentIndex() { @@ -198,6 +214,9 @@ public void testPitSearchOnCloseIndex() throws ExecutionException, InterruptedEx PitTestsUtil.assertUsingGetAllPits(client(), pitResponse.getId(), pitResponse.getCreationTime()); SearchService service = getInstanceFromNode(SearchService.class); assertEquals(2, service.getActiveContexts()); + validatePitStats("index", 1, 0, 0); + validatePitStats("index", 1, 0, 1); + client().admin().indices().prepareClose("index").get(); SearchPhaseExecutionException ex = expectThrows(SearchPhaseExecutionException.class, () -> { SearchResponse searchResponse = client().prepareSearch() @@ -246,7 +265,10 @@ public void testMaxOpenPitContexts() throws Exception { + "This limit can be set by changing the [search.max_open_pit_context] setting." ) ); + final int maxPitContexts = SearchService.MAX_OPEN_PIT_CONTEXT.get(Settings.EMPTY); + validatePitStats("index", maxPitContexts, 0, 0); service.doClose(); + validatePitStats("index", 0, maxPitContexts, 0); } public void testOpenPitContextsConcurrently() throws Exception { @@ -292,7 +314,9 @@ public void testOpenPitContextsConcurrently() throws Exception { thread.join(); } assertThat(service.getActiveContexts(), equalTo(maxPitContexts)); + validatePitStats("index", maxPitContexts, 0, 0); service.doClose(); + validatePitStats("index", 0, maxPitContexts, 0); } /** @@ -461,9 +485,11 @@ public void testPitAfterUpdateIndex() throws Exception { .getTotalHits().value, Matchers.equalTo(0L) ); + validatePitStats("test", 1, 0, 0); } finally { service.doClose(); assertEquals(0, service.getActiveContexts()); + validatePitStats("test", 0, 1, 0); PitTestsUtil.assertGetAllPitsEmpty(client()); } } @@ -505,8 +531,21 @@ public void testConcurrentSearches() throws Exception { SearchService service = getInstanceFromNode(SearchService.class); assertEquals(2, service.getActiveContexts()); + validatePitStats("index", 1, 0, 0); + validatePitStats("index", 1, 0, 1); service.doClose(); assertEquals(0, service.getActiveContexts()); + validatePitStats("index", 0, 1, 0); + validatePitStats("index", 0, 1, 1); PitTestsUtil.assertGetAllPitsEmpty(client()); } + + public void validatePitStats(String index, long expectedPitCurrent, long expectedPitCount, int shardId) throws ExecutionException, + InterruptedException { + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + IndexService indexService = indicesService.indexServiceSafe(resolveIndex(index)); + IndexShard indexShard = indexService.getShard(shardId); + assertEquals(expectedPitCurrent, indexShard.searchStats().getTotal().getPitCurrent()); + assertEquals(expectedPitCount, indexShard.searchStats().getTotal().getPitCount()); + } } diff --git a/server/src/test/java/org/opensearch/search/DeletePitMultiNodeTests.java b/server/src/test/java/org/opensearch/search/DeletePitMultiNodeTests.java index 70efcf88cd581..e69b2cc523638 100644 --- a/server/src/test/java/org/opensearch/search/DeletePitMultiNodeTests.java +++ b/server/src/test/java/org/opensearch/search/DeletePitMultiNodeTests.java @@ -11,6 +11,8 @@ import org.junit.After; import org.junit.Before; import org.opensearch.action.ActionFuture; +import org.opensearch.action.admin.indices.stats.IndicesStatsRequest; +import org.opensearch.action.admin.indices.stats.IndicesStatsResponse; import org.opensearch.action.ActionListener; import org.opensearch.action.LatchedActionListener; import org.opensearch.action.search.CreatePitAction; @@ -76,6 +78,7 @@ public void testDeletePit() throws Exception { execute = client().execute(CreatePitAction.INSTANCE, request); pitResponse = execute.get(); pitIds.add(pitResponse.getId()); + validatePitStats("index", 10, 0); DeletePitRequest deletePITRequest = new DeletePitRequest(pitIds); ActionFuture deleteExecute = client().execute(DeletePitAction.INSTANCE, deletePITRequest); DeletePitResponse deletePITResponse = deleteExecute.get(); @@ -84,6 +87,7 @@ public void testDeletePit() throws Exception { assertTrue(pitIds.contains(deletePitInfo.getPitId())); assertTrue(deletePitInfo.isSuccessful()); } + validatePitStats("index", 0, 10); /** * Checking deleting the same PIT id again results in succeeded */ @@ -102,6 +106,7 @@ public void testDeletePitWithValidAndDeletedIds() throws Exception { CreatePitResponse pitResponse = execute.get(); List pitIds = new ArrayList<>(); pitIds.add(pitResponse.getId()); + validatePitStats("index", 5, 0); /** * Delete Pit #1 @@ -113,9 +118,11 @@ public void testDeletePitWithValidAndDeletedIds() throws Exception { assertTrue(pitIds.contains(deletePitInfo.getPitId())); assertTrue(deletePitInfo.isSuccessful()); } + validatePitStats("index", 0, 5); execute = client().execute(CreatePitAction.INSTANCE, request); pitResponse = execute.get(); pitIds.add(pitResponse.getId()); + validatePitStats("index", 5, 5); /** * Delete PIT with both Ids #1 (which is deleted) and #2 (which is present) */ @@ -126,6 +133,7 @@ public void testDeletePitWithValidAndDeletedIds() throws Exception { assertTrue(pitIds.contains(deletePitInfo.getPitId())); assertTrue(deletePitInfo.isSuccessful()); } + validatePitStats("index", 0, 10); } public void testDeletePitWithValidAndInvalidIds() throws Exception { @@ -148,6 +156,8 @@ public void testDeleteAllPits() throws Exception { client().prepareIndex("index1").setId("1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).execute().get(); ensureGreen(); createPitOnIndex("index1"); + validatePitStats("index", 5, 0); + validatePitStats("index1", 5, 0); DeletePitRequest deletePITRequest = new DeletePitRequest("_all"); /** @@ -160,6 +170,8 @@ public void testDeleteAllPits() throws Exception { assertThat(deletePitInfo.getPitId(), not(blankOrNullString())); assertTrue(deletePitInfo.isSuccessful()); } + validatePitStats("index", 0, 5); + validatePitStats("index1", 0, 5); client().admin().indices().prepareDelete("index1").get(); } @@ -324,4 +336,16 @@ public void onFailure(Exception e) {} } } + public void validatePitStats(String index, long expectedPitCurrent, long expectedPitCount) throws ExecutionException, + InterruptedException { + IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest(); + indicesStatsRequest.indices(index); + indicesStatsRequest.all(); + IndicesStatsResponse indicesStatsResponse = client().admin().indices().stats(indicesStatsRequest).get(); + long pitCurrent = indicesStatsResponse.getIndex(index).getTotal().search.getTotal().getPitCurrent(); + long pitCount = indicesStatsResponse.getIndex(index).getTotal().search.getTotal().getPitCount(); + assertEquals(expectedPitCurrent, pitCurrent); + assertEquals(expectedPitCount, pitCount); + } + } diff --git a/server/src/test/java/org/opensearch/search/PitMultiNodeTests.java b/server/src/test/java/org/opensearch/search/PitMultiNodeTests.java index f93a43a027da7..29126d786770e 100644 --- a/server/src/test/java/org/opensearch/search/PitMultiNodeTests.java +++ b/server/src/test/java/org/opensearch/search/PitMultiNodeTests.java @@ -36,6 +36,8 @@ import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; +import org.opensearch.action.admin.indices.stats.IndicesStatsRequest; +import org.opensearch.action.admin.indices.stats.IndicesStatsResponse; import java.util.ArrayList; import java.util.HashSet; @@ -81,6 +83,7 @@ public void testPit() throws Exception { .get(); assertEquals(2, searchResponse.getSuccessfulShards()); assertEquals(2, searchResponse.getTotalShards()); + validatePitStats("index", 2, 2); PitTestsUtil.assertUsingGetAllPits(client(), pitResponse.getId(), pitResponse.getCreationTime()); } @@ -94,6 +97,7 @@ public Settings onNodeStopped(String nodeName) throws Exception { ExecutionException ex = expectThrows(ExecutionException.class, execute::get); assertTrue(ex.getMessage().contains("Failed to execute phase [create_pit]")); assertTrue(ex.getMessage().contains("Partial shards failure")); + validatePitStats("index", 0, 0); return super.onNodeStopped(nodeName); } }); @@ -116,6 +120,7 @@ public Settings onNodeStopped(String nodeName) throws Exception { .get(); assertEquals(1, searchResponse.getSuccessfulShards()); assertEquals(1, searchResponse.getTotalShards()); + validatePitStats("index", 1, 1); return super.onNodeStopped(nodeName); } }); @@ -137,6 +142,7 @@ public Settings onNodeStopped(String nodeName) throws Exception { assertEquals(1, searchResponse.getFailedShards()); assertEquals(0, searchResponse.getSkippedShards()); assertEquals(2, searchResponse.getTotalShards()); + validatePitStats("index", 1, 1); PitTestsUtil.assertUsingGetAllPits(client(), pitResponse.getId(), pitResponse.getCreationTime()); return super.onNodeStopped(nodeName); } @@ -327,6 +333,18 @@ public void onFailure(Exception e) {} } } + public void validatePitStats(String index, long expectedPitCurrent, long expectedOpenContexts) throws ExecutionException, + InterruptedException { + IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest(); + indicesStatsRequest.indices("index"); + indicesStatsRequest.all(); + IndicesStatsResponse indicesStatsResponse = client().admin().indices().stats(indicesStatsRequest).get(); + long pitCurrent = indicesStatsResponse.getIndex(index).getTotal().search.getTotal().getPitCurrent(); + long openContexts = indicesStatsResponse.getIndex(index).getTotal().search.getOpenContexts(); + assertEquals(expectedPitCurrent, pitCurrent); + assertEquals(expectedOpenContexts, openContexts); + } + public void testGetAllPits() throws Exception { client().admin().indices().prepareCreate("index1").get(); CreatePitRequest request = new CreatePitRequest(TimeValue.timeValueDays(1), true); diff --git a/server/src/test/java/org/opensearch/search/SearchServiceTests.java b/server/src/test/java/org/opensearch/search/SearchServiceTests.java index 9c140cda6ccc8..1f824d40eb638 100644 --- a/server/src/test/java/org/opensearch/search/SearchServiceTests.java +++ b/server/src/test/java/org/opensearch/search/SearchServiceTests.java @@ -1429,7 +1429,7 @@ private ReaderContext createReaderContext(IndexService indexService, IndexShard ); } - public void testDeletePitReaderContext() { + public void testDeletePitReaderContext() throws ExecutionException, InterruptedException { createIndex("index"); SearchService searchService = getInstanceFromNode(SearchService.class); PlainActionFuture future = new PlainActionFuture<>(); @@ -1444,6 +1444,7 @@ public void testDeletePitReaderContext() { assertThat(searchService.getActiveContexts(), equalTo(1)); assertThat(searchService.getAllPITReaderContexts().size(), equalTo(1)); + validatePitStats("index", 1, 0, 0); DeletePitResponse deletePitResponse = searchService.freeReaderContextsIfFound(contextIds); assertTrue(deletePitResponse.getDeletePitResults().get(0).isSuccessful()); // assert true for reader context not found @@ -1451,6 +1452,7 @@ public void testDeletePitReaderContext() { assertTrue(deletePitResponse.getDeletePitResults().get(0).isSuccessful()); // adding this assert to showcase behavior difference assertFalse(searchService.freeReaderContext(future.actionGet())); + validatePitStats("index", 0, 1, 0); } public void testPitContextMaxKeepAlive() { @@ -1476,7 +1478,7 @@ public void testPitContextMaxKeepAlive() { assertThat(searchService.getAllPITReaderContexts().size(), equalTo(0)); } - public void testUpdatePitId() { + public void testUpdatePitId() throws ExecutionException, InterruptedException { createIndex("index"); SearchService searchService = getInstanceFromNode(SearchService.class); PlainActionFuture future = new PlainActionFuture<>(); @@ -1497,7 +1499,9 @@ public void testUpdatePitId() { assertTrue(updateResponse.getPitId().equalsIgnoreCase("pitId")); assertThat(searchService.getActiveContexts(), equalTo(1)); assertThat(searchService.getAllPITReaderContexts().size(), equalTo(1)); + validatePitStats("index", 1, 0, 0); assertTrue(searchService.freeReaderContext(future.actionGet())); + validatePitStats("index", 0, 1, 0); } public void testUpdatePitIdMaxKeepAlive() { @@ -1552,4 +1556,13 @@ public void testUpdatePitIdWithInvalidReaderId() { assertThat(searchService.getActiveContexts(), equalTo(0)); assertThat(searchService.getAllPITReaderContexts().size(), equalTo(0)); } + + public void validatePitStats(String index, long expectedPitCurrent, long expectedPitCount, int shardId) throws ExecutionException, + InterruptedException { + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + IndexService indexService = indicesService.indexServiceSafe(resolveIndex(index)); + IndexShard indexShard = indexService.getShard(shardId); + assertEquals(expectedPitCurrent, indexShard.searchStats().getTotal().getPitCurrent()); + assertEquals(expectedPitCount, indexShard.searchStats().getTotal().getPitCount()); + } }