Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added Point In Time Node Stats API ServiceLayer Changes #4030

Merged
merged 6 commits into from
Aug 8, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.index.search.stats;

import org.opensearch.Version;
import org.opensearch.common.Nullable;
import org.opensearch.common.Strings;
import org.opensearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -77,6 +78,10 @@ public static class Stats implements Writeable, ToXContentFragment {
private long suggestTimeInMillis;
private long suggestCurrent;

private long pitCount;
private long pitTimeInMillis;
private long pitCurrent;

private Stats() {
// for internal use, initializes all counts to 0
}
Expand All @@ -91,6 +96,9 @@ public Stats(
long scrollCount,
long scrollTimeInMillis,
long scrollCurrent,
long pitCount,
long pitTimeInMillis,
long pitCurrent,
long suggestCount,
long suggestTimeInMillis,
long suggestCurrent
Expand All @@ -110,6 +118,10 @@ public Stats(
this.suggestCount = suggestCount;
this.suggestTimeInMillis = suggestTimeInMillis;
this.suggestCurrent = suggestCurrent;

this.pitCount = pitCount;
this.pitTimeInMillis = pitTimeInMillis;
this.pitCurrent = pitCurrent;
}

private Stats(StreamInput in) throws IOException {
Expand All @@ -128,6 +140,12 @@ private Stats(StreamInput in) throws IOException {
suggestCount = in.readVLong();
suggestTimeInMillis = in.readVLong();
suggestCurrent = in.readVLong();

if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
pitCount = in.readVLong();
pitTimeInMillis = in.readVLong();
pitCurrent = in.readVLong();
}
ajaymovva marked this conversation as resolved.
Show resolved Hide resolved
}

public void add(Stats stats) {
Expand All @@ -146,6 +164,10 @@ public void add(Stats stats) {
suggestCount += stats.suggestCount;
suggestTimeInMillis += stats.suggestTimeInMillis;
suggestCurrent += stats.suggestCurrent;

pitCount += stats.pitCount;
pitTimeInMillis += stats.pitTimeInMillis;
pitCurrent += stats.pitCurrent;
}

public void addForClosingShard(Stats stats) {
Expand All @@ -162,6 +184,10 @@ public void addForClosingShard(Stats stats) {

suggestCount += stats.suggestCount;
suggestTimeInMillis += stats.suggestTimeInMillis;

pitCount += stats.pitCount;
pitTimeInMillis += stats.pitTimeInMillis;
pitCurrent += stats.pitCurrent;
}

public long getQueryCount() {
Expand Down Expand Up @@ -212,6 +238,22 @@ public long getScrollCurrent() {
return scrollCurrent;
}

public long getPitCount() {
return pitCount;
}

public TimeValue getPitTime() {
return new TimeValue(pitTimeInMillis);
}

public long getPitTimeInMillis() {
return pitTimeInMillis;
}

public long getPitCurrent() {
return pitCurrent;
}

public long getSuggestCount() {
return suggestCount;
}
Expand Down Expand Up @@ -249,6 +291,12 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(suggestCount);
out.writeVLong(suggestTimeInMillis);
out.writeVLong(suggestCurrent);

if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeVLong(pitCount);
out.writeVLong(pitTimeInMillis);
out.writeVLong(pitCurrent);
}
}

@Override
Expand All @@ -265,6 +313,10 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.humanReadableField(Fields.SCROLL_TIME_IN_MILLIS, Fields.SCROLL_TIME, getScrollTime());
builder.field(Fields.SCROLL_CURRENT, scrollCurrent);

builder.field(Fields.PIT_TOTAL, pitCount);
builder.humanReadableField(Fields.PIT_TIME_IN_MILLIS, Fields.PIT_TIME, getPitTime());
builder.field(Fields.PIT_CURRENT, pitCurrent);

builder.field(Fields.SUGGEST_TOTAL, suggestCount);
builder.humanReadableField(Fields.SUGGEST_TIME_IN_MILLIS, Fields.SUGGEST_TIME, getSuggestTime());
builder.field(Fields.SUGGEST_CURRENT, suggestCurrent);
Expand Down Expand Up @@ -385,6 +437,10 @@ static final class Fields {
static final String SCROLL_TIME = "scroll_time";
static final String SCROLL_TIME_IN_MILLIS = "scroll_time_in_millis";
static final String SCROLL_CURRENT = "scroll_current";
static final String PIT_TOTAL = "pit_total";
static final String PIT_TIME = "pit_time";
static final String PIT_TIME_IN_MILLIS = "pit_time_in_millis";
static final String PIT_CURRENT = "pit_current";
ajaymovva marked this conversation as resolved.
Show resolved Hide resolved
static final String SUGGEST_TOTAL = "suggest_total";
static final String SUGGEST_TIME = "suggest_time";
static final String SUGGEST_TIME_IN_MILLIS = "suggest_time_in_millis";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,18 @@ public void onFreeScrollContext(ReaderContext readerContext) {
totalStats.scrollMetric.inc(TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - readerContext.getStartTimeInNano()));
}

@Override
public void onNewPitContext(ReaderContext readerContext) {
totalStats.pitCurrent.inc();
}

@Override
public void onFreePitContext(ReaderContext readerContext) {
totalStats.pitCurrent.dec();
assert totalStats.pitCurrent.count() >= 0;
totalStats.pitMetric.inc(TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - readerContext.getStartTimeInNano()));
}

/**
* Holder of statistics values
*
Expand All @@ -203,10 +215,12 @@ static final class StatsHolder {
* for one-thousand times as long (i.e., scrolls that execute for almost twelve days on average).
*/
final MeanMetric scrollMetric = new MeanMetric();
final MeanMetric pitMetric = new MeanMetric();
final MeanMetric suggestMetric = new MeanMetric();
final CounterMetric queryCurrent = new CounterMetric();
final CounterMetric fetchCurrent = new CounterMetric();
final CounterMetric scrollCurrent = new CounterMetric();
final CounterMetric pitCurrent = new CounterMetric();
final CounterMetric suggestCurrent = new CounterMetric();

SearchStats.Stats stats() {
Expand All @@ -220,6 +234,9 @@ SearchStats.Stats stats() {
scrollMetric.count(),
TimeUnit.MICROSECONDS.toMillis(scrollMetric.sum()),
scrollCurrent.count(),
pitMetric.count(),
TimeUnit.MICROSECONDS.toMillis(pitMetric.sum()),
pitCurrent.count(),
suggestMetric.count(),
TimeUnit.NANOSECONDS.toMillis(suggestMetric.sum()),
suggestCurrent.count()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ public void testShardLevelSearchGroupStats() throws Exception {
// let's create two dummy search stats with groups
Map<String, Stats> groupStats1 = new HashMap<>();
Map<String, Stats> groupStats2 = new HashMap<>();
groupStats2.put("group1", new Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1));
SearchStats searchStats1 = new SearchStats(new Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1), 0, groupStats1);
SearchStats searchStats2 = new SearchStats(new Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1), 0, groupStats2);
groupStats2.put("group1", new Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1));
SearchStats searchStats1 = new SearchStats(new Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1), 0, groupStats1);
SearchStats searchStats2 = new SearchStats(new Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1), 0, groupStats2);

// adding these two search stats and checking group stats are correct
searchStats1.add(searchStats2);
Expand Down Expand Up @@ -75,6 +75,9 @@ private static void assertStats(Stats stats, long equalTo) {
assertEquals(equalTo, stats.getScrollCount());
assertEquals(equalTo, stats.getScrollTimeInMillis());
assertEquals(equalTo, stats.getScrollCurrent());
assertEquals(equalTo, stats.getPitCount());
assertEquals(equalTo, stats.getPitTimeInMillis());
assertEquals(equalTo, stats.getPitCurrent());
assertEquals(equalTo, stats.getSuggestCount());
assertEquals(equalTo, stats.getSuggestTimeInMillis());
assertEquals(equalTo, stats.getSuggestCurrent());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
import org.opensearch.search.builder.PointInTimeBuilder;
import org.opensearch.search.sort.SortOrder;
import org.opensearch.test.OpenSearchSingleNodeTestCase;
import org.opensearch.index.IndexService;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.indices.IndicesService;

import java.util.Map;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -74,7 +77,11 @@ public void testCreatePITSuccess() throws ExecutionException, InterruptedExcepti

SearchService service = getInstanceFromNode(SearchService.class);
assertEquals(2, service.getActiveContexts());
validatePitStats("index", 1, 0, 0);
validatePitStats("index", 1, 0, 1);
service.doClose(); // this kills the keep-alive reaper we have to reset the node after this test
validatePitStats("index", 0, 1, 0);
validatePitStats("index", 0, 1, 1);
}

public void testCreatePITWithMultipleIndicesSuccess() throws ExecutionException, InterruptedException {
Expand All @@ -91,7 +98,12 @@ public void testCreatePITWithMultipleIndicesSuccess() throws ExecutionException,
PitTestsUtil.assertUsingGetAllPits(client(), response.getId(), response.getCreationTime());
assertEquals(4, response.getSuccessfulShards());
assertEquals(4, service.getActiveContexts());

validatePitStats("index", 1, 0, 0);
validatePitStats("index1", 1, 0, 0);
service.doClose();
validatePitStats("index", 0, 1, 0);
validatePitStats("index1", 0, 1, 0);
}

public void testCreatePITWithShardReplicasSuccess() throws ExecutionException, InterruptedException {
Expand All @@ -112,7 +124,11 @@ public void testCreatePITWithShardReplicasSuccess() throws ExecutionException, I

SearchService service = getInstanceFromNode(SearchService.class);
assertEquals(2, service.getActiveContexts());
validatePitStats("index", 1, 0, 0);
validatePitStats("index", 1, 0, 1);
service.doClose();
validatePitStats("index", 0, 1, 0);
validatePitStats("index", 0, 1, 1);
}

public void testCreatePITWithNonExistentIndex() {
Expand Down Expand Up @@ -198,6 +214,9 @@ public void testPitSearchOnCloseIndex() throws ExecutionException, InterruptedEx
PitTestsUtil.assertUsingGetAllPits(client(), pitResponse.getId(), pitResponse.getCreationTime());
SearchService service = getInstanceFromNode(SearchService.class);
assertEquals(2, service.getActiveContexts());
validatePitStats("index", 1, 0, 0);
validatePitStats("index", 1, 0, 1);

client().admin().indices().prepareClose("index").get();
SearchPhaseExecutionException ex = expectThrows(SearchPhaseExecutionException.class, () -> {
SearchResponse searchResponse = client().prepareSearch()
Expand Down Expand Up @@ -246,7 +265,10 @@ public void testMaxOpenPitContexts() throws Exception {
+ "This limit can be set by changing the [search.max_open_pit_context] setting."
)
);
final int maxPitContexts = SearchService.MAX_OPEN_PIT_CONTEXT.get(Settings.EMPTY);
validatePitStats("index", maxPitContexts, 0, 0);
service.doClose();
validatePitStats("index", 0, maxPitContexts, 0);
}

public void testOpenPitContextsConcurrently() throws Exception {
Expand Down Expand Up @@ -292,7 +314,9 @@ public void testOpenPitContextsConcurrently() throws Exception {
thread.join();
}
assertThat(service.getActiveContexts(), equalTo(maxPitContexts));
validatePitStats("index", maxPitContexts, 0, 0);
service.doClose();
validatePitStats("index", 0, maxPitContexts, 0);
}

/**
Expand Down Expand Up @@ -461,9 +485,11 @@ public void testPitAfterUpdateIndex() throws Exception {
.getTotalHits().value,
Matchers.equalTo(0L)
);
validatePitStats("test", 1, 0, 0);
} finally {
service.doClose();
assertEquals(0, service.getActiveContexts());
validatePitStats("test", 0, 1, 0);
PitTestsUtil.assertGetAllPitsEmpty(client());
}
}
Expand Down Expand Up @@ -505,8 +531,21 @@ public void testConcurrentSearches() throws Exception {

SearchService service = getInstanceFromNode(SearchService.class);
assertEquals(2, service.getActiveContexts());
validatePitStats("index", 1, 0, 0);
validatePitStats("index", 1, 0, 1);
service.doClose();
assertEquals(0, service.getActiveContexts());
validatePitStats("index", 0, 1, 0);
validatePitStats("index", 0, 1, 1);
PitTestsUtil.assertGetAllPitsEmpty(client());
}

public void validatePitStats(String index, long expectedPitCurrent, long expectedPitCount, int shardId) throws ExecutionException,
InterruptedException {
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
IndexService indexService = indicesService.indexServiceSafe(resolveIndex(index));
IndexShard indexShard = indexService.getShard(shardId);
assertEquals(expectedPitCurrent, indexShard.searchStats().getTotal().getPitCurrent());
assertEquals(expectedPitCount, indexShard.searchStats().getTotal().getPitCount());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import org.junit.After;
import org.junit.Before;
import org.opensearch.action.ActionFuture;
import org.opensearch.action.admin.indices.stats.IndicesStatsRequest;
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
import org.opensearch.action.ActionListener;
import org.opensearch.action.LatchedActionListener;
import org.opensearch.action.search.CreatePitAction;
Expand Down Expand Up @@ -76,6 +78,7 @@ public void testDeletePit() throws Exception {
execute = client().execute(CreatePitAction.INSTANCE, request);
pitResponse = execute.get();
pitIds.add(pitResponse.getId());
validatePitStats("index", 10, 0);
DeletePitRequest deletePITRequest = new DeletePitRequest(pitIds);
ActionFuture<DeletePitResponse> deleteExecute = client().execute(DeletePitAction.INSTANCE, deletePITRequest);
DeletePitResponse deletePITResponse = deleteExecute.get();
Expand All @@ -84,6 +87,7 @@ public void testDeletePit() throws Exception {
assertTrue(pitIds.contains(deletePitInfo.getPitId()));
assertTrue(deletePitInfo.isSuccessful());
}
validatePitStats("index", 0, 10);
/**
* Checking deleting the same PIT id again results in succeeded
*/
Expand All @@ -102,6 +106,7 @@ public void testDeletePitWithValidAndDeletedIds() throws Exception {
CreatePitResponse pitResponse = execute.get();
List<String> pitIds = new ArrayList<>();
pitIds.add(pitResponse.getId());
validatePitStats("index", 5, 0);

/**
* Delete Pit #1
Expand All @@ -113,9 +118,11 @@ public void testDeletePitWithValidAndDeletedIds() throws Exception {
assertTrue(pitIds.contains(deletePitInfo.getPitId()));
assertTrue(deletePitInfo.isSuccessful());
}
validatePitStats("index", 0, 5);
execute = client().execute(CreatePitAction.INSTANCE, request);
pitResponse = execute.get();
pitIds.add(pitResponse.getId());
validatePitStats("index", 5, 5);
/**
* Delete PIT with both Ids #1 (which is deleted) and #2 (which is present)
*/
Expand All @@ -126,6 +133,7 @@ public void testDeletePitWithValidAndDeletedIds() throws Exception {
assertTrue(pitIds.contains(deletePitInfo.getPitId()));
assertTrue(deletePitInfo.isSuccessful());
}
validatePitStats("index", 0, 10);
}

public void testDeletePitWithValidAndInvalidIds() throws Exception {
Expand All @@ -148,6 +156,8 @@ public void testDeleteAllPits() throws Exception {
client().prepareIndex("index1").setId("1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).execute().get();
ensureGreen();
createPitOnIndex("index1");
validatePitStats("index", 5, 0);
validatePitStats("index1", 5, 0);
DeletePitRequest deletePITRequest = new DeletePitRequest("_all");

/**
Expand All @@ -160,6 +170,8 @@ public void testDeleteAllPits() throws Exception {
assertThat(deletePitInfo.getPitId(), not(blankOrNullString()));
assertTrue(deletePitInfo.isSuccessful());
}
validatePitStats("index", 0, 5);
validatePitStats("index1", 0, 5);
client().admin().indices().prepareDelete("index1").get();
}

Expand Down Expand Up @@ -324,4 +336,16 @@ public void onFailure(Exception e) {}
}
}

public void validatePitStats(String index, long expectedPitCurrent, long expectedPitCount) throws ExecutionException,
InterruptedException {
IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest();
indicesStatsRequest.indices(index);
indicesStatsRequest.all();
IndicesStatsResponse indicesStatsResponse = client().admin().indices().stats(indicesStatsRequest).get();
long pitCurrent = indicesStatsResponse.getIndex(index).getTotal().search.getTotal().getPitCurrent();
long pitCount = indicesStatsResponse.getIndex(index).getTotal().search.getTotal().getPitCount();
assertEquals(expectedPitCurrent, pitCurrent);
assertEquals(expectedPitCount, pitCount);
}

}
Loading