diff --git a/server/src/main/java/org/opensearch/action/support/IndicesOptions.java b/server/src/main/java/org/opensearch/action/support/IndicesOptions.java index 2d9fecddb6f7d..2566f00f8dc7d 100644 --- a/server/src/main/java/org/opensearch/action/support/IndicesOptions.java +++ b/server/src/main/java/org/opensearch/action/support/IndicesOptions.java @@ -413,6 +413,16 @@ public static IndicesOptions fromRequest(RestRequest request, IndicesOptions def ); } + public static IndicesOptions fromPaginatedRequest(RestRequest request, IndicesOptions defaultSettings) { + return fromParameters( + null, + request.param("ignore_unavailable"), + request.param("allow_no_indices"), + request.param("ignore_throttled"), + defaultSettings + ); + } + public static IndicesOptions fromMap(Map map, IndicesOptions defaultSettings) { return fromParameters( map.containsKey("expand_wildcards") ? map.get("expand_wildcards") : map.get("expandWildcards"), diff --git a/server/src/main/java/org/opensearch/common/Table.java b/server/src/main/java/org/opensearch/common/Table.java index da14f628efa0f..5e80b44b7fdd8 100644 --- a/server/src/main/java/org/opensearch/common/Table.java +++ b/server/src/main/java/org/opensearch/common/Table.java @@ -43,6 +43,8 @@ import java.util.Map; import java.util.concurrent.TimeUnit; +import reactor.util.annotation.NonNull; + import static java.util.Collections.emptyMap; /** @@ -59,9 +61,19 @@ public class Table { private List currentCells; private boolean inHeaders = false; private boolean withTime = false; + private PaginationMetadata paginationMetadata = new PaginationMetadata(false, null, null); + public static final String EPOCH = "epoch"; public static final String TIMESTAMP = "timestamp"; + public Table() {} + + public Table(@Nullable PaginationMetadata paginationMetadata) { + if (paginationMetadata != null) { + this.paginationMetadata = paginationMetadata; + } + } + public Table startHeaders() { inHeaders = true; currentCells = new ArrayList<>(); @@ -230,6 +242,18 @@ public Map getAliasMap() { return headerAliasMap; } + public boolean isTablePaginated() { + return paginationMetadata.isResponsePaginated; + } + + public String getNextTokenForTable() { + return paginationMetadata.nextToken; + } + + public String getPaginatedElementForTable() { + return paginationMetadata.paginatedElement; + } + /** * Cell in a table * @@ -254,4 +278,34 @@ public Cell(Object value, Map attr) { this.attr = attr; } } + + /** + * Pagination metadata for a table. + * + * @opensearch.internal + */ + public static class PaginationMetadata { + + /** + * boolean denoting whether the table is paginated or not. + */ + public final boolean isResponsePaginated; + + /** + * String denoting the element which is being paginated (for e.g. shards, indices..). + */ + public final String paginatedElement; + + /** + * String denoting the nextToken of paginated response, which will be used to fetch nextPage (if any). + */ + public final String nextToken; + + public PaginationMetadata(@NonNull boolean isResponsePaginated, @Nullable String paginatedElement, @Nullable String nextToken) { + this.isResponsePaginated = isResponsePaginated; + assert !isResponsePaginated || paginatedElement != null : "paginatedElement must be specified for a table which is paginated"; + this.paginatedElement = paginatedElement; + this.nextToken = nextToken; + } + } } diff --git a/server/src/main/java/org/opensearch/rest/action/cat/RestIndicesAction.java b/server/src/main/java/org/opensearch/rest/action/cat/RestIndicesAction.java index 9dc711f804144..2ff7eebecf6b3 100644 --- a/server/src/main/java/org/opensearch/rest/action/cat/RestIndicesAction.java +++ b/server/src/main/java/org/opensearch/rest/action/cat/RestIndicesAction.java @@ -62,21 +62,26 @@ import org.opensearch.rest.RestResponse; import org.opensearch.rest.action.RestResponseListener; +import java.nio.charset.StandardCharsets; import java.time.Instant; import java.time.ZoneOffset; import java.time.ZonedDateTime; +import java.util.ArrayList; +import java.util.Base64; import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.Spliterators; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.StreamSupport; +import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.Arrays.asList; import static java.util.Collections.unmodifiableList; import static org.opensearch.action.support.clustermanager.ClusterManagerNodeRequest.DEFAULT_CLUSTER_MANAGER_NODE_TIMEOUT; @@ -95,6 +100,9 @@ public class RestIndicesAction extends AbstractCatAction { "Parameter [master_timeout] is deprecated and will be removed in 3.0. To support inclusive language, please use [cluster_manager_timeout] instead."; private static final String DUPLICATE_PARAMETER_ERROR_MESSAGE = "Please only use one of the request parameters [master_timeout, cluster_manager_timeout]."; + private static final String INCORRECT_TAINTED_NEXT_TOKEN_ERROR_MESSAGE = + "Parameter [nextToken] has been tainted and is incorrect. Please provide a valid [nextToken]."; + private static final String DEFAULT_CAT_INDICES_PAGE_SIZE_STRING = "100"; @Override public List routes() { @@ -119,6 +127,9 @@ protected void documentation(StringBuilder sb) { @Override public RestChannelConsumer doCatRequest(final RestRequest request, final NodeClient client) { + if (request.hasParam("nextToken")) { + return doPaginatedCatRequest(request, client); + } final String[] indices = Strings.splitStringByCommaToArray(request.param("index")); final IndicesOptions indicesOptions = IndicesOptions.fromRequest(request, IndicesOptions.strictExpand()); final boolean local = request.paramAsBoolean("local", false); @@ -205,6 +216,113 @@ public void onFailure(final Exception e) { }; } + public RestChannelConsumer doPaginatedCatRequest(final RestRequest request, final NodeClient client) { + // Paginated query will not read "indices" and "expand_wildcard" query parameters, and hence if specified with nextToken + // will result in "illegal_argument_exception". + TimeValue clusterManagerTimeout = request.paramAsTime("cluster_manager_timeout", DEFAULT_CLUSTER_MANAGER_NODE_TIMEOUT); + // Remove the if condition and statements inside after removing MASTER_ROLE. + if (request.hasParam("master_timeout")) { + deprecationLogger.deprecate("cat_indices_master_timeout_parameter", MASTER_TIMEOUT_DEPRECATED_MESSAGE); + if (request.hasParam("cluster_manager_timeout")) { + throw new OpenSearchParseException(DUPLICATE_PARAMETER_ERROR_MESSAGE); + } + clusterManagerTimeout = request.paramAsTime("master_timeout", DEFAULT_CLUSTER_MANAGER_NODE_TIMEOUT); + } + final boolean local = request.paramAsBoolean("local", false); + final TimeValue clusterManagerNodeTimeout = clusterManagerTimeout; + final boolean includeUnloadedSegments = request.paramAsBoolean("include_unloaded_segments", false); + // Get the nextToken and pageSize provided in the request + final String nextTokenInRequest = validateAndGetNextTokenFromRequest(request); + final int pageSize = Integer.parseInt(request.param("pageSize", DEFAULT_CAT_INDICES_PAGE_SIZE_STRING)); + + return channel -> { + final ActionListener listener = ActionListener.notifyOnce(new RestResponseListener
(channel) { + @Override + public RestResponse buildResponse(final Table table) throws Exception { + return RestTable.buildResponse(table, channel); + } + }); + + // Fetch all the indices from clusterStateRequest for a paginated query. + sendClusterStateRequest( + new String[0], + IndicesOptions.lenientExpandHidden(), + local, + clusterManagerNodeTimeout, + client, + new ActionListener() { + @Override + public void onResponse(final ClusterStateResponse clusterStateResponse) { + // Get a sorted list of indices from metadata and filter out the required number of indices + List sortedIndicesList = getListOfIndicesSortedByCreateTime(clusterStateResponse); + final int newPageStartIndexNumber = getNewPageIndexStartNumber( + nextTokenInRequest, + sortedIndicesList, + clusterStateResponse + ); // inclusive + int newPageEndIndexNumber = Math.min(newPageStartIndexNumber + pageSize, sortedIndicesList.size()); // exclusive + String[] indicesToBeQueried = sortedIndicesList.subList(newPageStartIndexNumber, newPageEndIndexNumber) + .toArray(new String[0]); + // Generate the new nextToken which is to be passed in the response + // NextToken = "IndexNumberToStartTheNextPageFrom + $ + CreationTimeOfLastRespondedIndex + $ + + // NameOfLastRespondedIndex" -> (1$12345678$testIndex) + final String nextToken = newPageEndIndexNumber >= sortedIndicesList.size() + ? null + : Base64.getEncoder() + .encodeToString( + (newPageEndIndexNumber + + "$" + + clusterStateResponse.getState() + .metadata() + .indices() + .get(sortedIndicesList.get(newPageEndIndexNumber - 1)) + .getCreationDate() + + "$" + + sortedIndicesList.get(newPageEndIndexNumber - 1)).getBytes(StandardCharsets.UTF_8) + ); + + final GroupedActionListener groupedListener = createGroupedListener( + request, + 4, + listener, + new Table.PaginationMetadata(true, "indices", nextToken) + ); + groupedListener.onResponse(clusterStateResponse); + + sendGetSettingsRequest( + indicesToBeQueried, + IndicesOptions.fromPaginatedRequest(request, IndicesOptions.strictExpand()), + local, + clusterManagerNodeTimeout, + client, + ActionListener.wrap(groupedListener::onResponse, groupedListener::onFailure) + ); + sendIndicesStatsRequest( + indicesToBeQueried, + IndicesOptions.lenientExpandHidden(), + includeUnloadedSegments, + client, + ActionListener.wrap(groupedListener::onResponse, groupedListener::onFailure) + ); + sendClusterHealthRequest( + indicesToBeQueried, + IndicesOptions.lenientExpandHidden(), + local, + clusterManagerNodeTimeout, + client, + ActionListener.wrap(groupedListener::onResponse, groupedListener::onFailure) + ); + } + + @Override + public void onFailure(final Exception e) { + listener.onFailure(e); + } + } + ); + }; + } + /** * We're using the Get Settings API here to resolve the authorized indices for the user. * This is because the Cluster State and Cluster Health APIs do not filter output based @@ -288,6 +406,15 @@ private GroupedActionListener createGroupedListener( final RestRequest request, final int size, final ActionListener
listener + ) { + return createGroupedListener(request, size, listener, null); + } + + private GroupedActionListener createGroupedListener( + final RestRequest request, + final int size, + final ActionListener
listener, + final Table.PaginationMetadata paginationMetadata ) { return new GroupedActionListener<>(new ActionListener>() { @Override @@ -311,7 +438,14 @@ public void onResponse(final Collection responses) { IndicesStatsResponse statsResponse = extractResponse(responses, IndicesStatsResponse.class); Map indicesStats = statsResponse.getIndices(); - Table responseTable = buildTable(request, indicesSettings, indicesHealths, indicesStats, indicesStates); + Table responseTable = buildTable( + request, + indicesSettings, + indicesHealths, + indicesStats, + indicesStates, + paginationMetadata + ); listener.onResponse(responseTable); } catch (Exception e) { onFailure(e); @@ -340,7 +474,11 @@ protected Set responseParams() { @Override protected Table getTableWithHeader(final RestRequest request) { - Table table = new Table(); + return getTableWithHeader(request, null); + } + + protected Table getTableWithHeader(final RestRequest request, final Table.PaginationMetadata paginationMetadata) { + Table table = new Table(paginationMetadata); table.startHeaders(); table.addCell("health", "alias:h;desc:current health status"); table.addCell("status", "alias:s;desc:open/close status"); @@ -709,11 +847,12 @@ Table buildTable( final Map indicesSettings, final Map indicesHealths, final Map indicesStats, - final Map indicesMetadatas + final Map indicesMetadatas, + final Table.PaginationMetadata paginationMetadata ) { final String healthParam = request.param("health"); - final Table table = getTableWithHeader(request); + final Table table = getTableWithHeader(request, paginationMetadata); indicesSettings.forEach((indexName, settings) -> { if (indicesMetadatas.containsKey(indexName) == false) { @@ -991,4 +1130,77 @@ Table buildTable( private static A extractResponse(final Collection responses, Class c) { return (A) responses.stream().filter(c::isInstance).findFirst().get(); } + + private List getListOfIndicesSortedByCreateTime(final ClusterStateResponse clusterStateResponse) { + List indicesList = new ArrayList(clusterStateResponse.getState().getRoutingTable().getIndicesRouting().keySet()); + indicesList.sort((index1, index2) -> { + Long index1CreationTimeStamp = clusterStateResponse.getState().metadata().indices().get(index1).getCreationDate(); + Long index2CreationTimeStamp = clusterStateResponse.getState().metadata().indices().get(index2).getCreationDate(); + if (index1CreationTimeStamp.equals(index2CreationTimeStamp)) { + return index1.compareTo(index2); + } + return (index1CreationTimeStamp - index2CreationTimeStamp) > 0 ? 1 : -1; + }); + return indicesList; + } + + private String validateAndGetNextTokenFromRequest(RestRequest request) { + String nextTokenInRequest; + try { + nextTokenInRequest = Objects.equals(request.param("nextToken"), "null") + ? null + : new String(Base64.getDecoder().decode(request.param("nextToken")), UTF_8); + } catch (IllegalArgumentException exception) { + throw new OpenSearchParseException(INCORRECT_TAINTED_NEXT_TOKEN_ERROR_MESSAGE); + } + + if (Objects.isNull(nextTokenInRequest)) { + return null; + } + final String[] nextTokenElements = nextTokenInRequest.split("\\$"); + if (nextTokenElements.length != 3) { + throw new OpenSearchParseException(INCORRECT_TAINTED_NEXT_TOKEN_ERROR_MESSAGE); + } + + try { + int newPageStartIndexNumber = Integer.parseInt(nextTokenElements[0]); + long lastIndexCreationTime = Long.parseLong(nextTokenElements[1]); + if (newPageStartIndexNumber < 0 || lastIndexCreationTime < 0) { + throw new OpenSearchParseException(INCORRECT_TAINTED_NEXT_TOKEN_ERROR_MESSAGE); + } + } catch (NumberFormatException exception) { + throw new OpenSearchParseException(INCORRECT_TAINTED_NEXT_TOKEN_ERROR_MESSAGE); + } + return nextTokenInRequest; + } + + private int getNewPageIndexStartNumber( + final String nextTokenInRequest, + final List sortedIndicesList, + final ClusterStateResponse clusterStateResponse + ) { + if (Objects.isNull(nextTokenInRequest)) { + return 0; + } + final String[] nextTokenElements = nextTokenInRequest.split("\\$"); + int newPageStartIndexNumber = Math.min(Integer.parseInt(nextTokenElements[0]), sortedIndicesList.size()); + long lastIndexCreationTime = Long.parseLong(nextTokenElements[1]); + String lastIndexName = nextTokenElements[2]; + if (newPageStartIndexNumber > 0 && !Objects.equals(sortedIndicesList.get(newPageStartIndexNumber - 1), lastIndexName)) { + // case denoting an already responded index has been deleted while the paginated queries are being executed + // find the index whose creation time is just after the index which was last responded + newPageStartIndexNumber--; + while (newPageStartIndexNumber > 0) { + if (clusterStateResponse.getState() + .metadata() + .indices() + .get(sortedIndicesList.get(newPageStartIndexNumber - 1)) + .getCreationDate() < lastIndexCreationTime) { + break; + } + newPageStartIndexNumber--; + } + } + return newPageStartIndexNumber; + } } diff --git a/server/src/main/java/org/opensearch/rest/action/cat/RestTable.java b/server/src/main/java/org/opensearch/rest/action/cat/RestTable.java index 4f1090b163ee6..2cb73ff44919e 100644 --- a/server/src/main/java/org/opensearch/rest/action/cat/RestTable.java +++ b/server/src/main/java/org/opensearch/rest/action/cat/RestTable.java @@ -88,7 +88,15 @@ public static RestResponse buildXContentBuilder(Table table, RestChannel channel XContentBuilder builder = channel.newBuilder(); List displayHeaders = buildDisplayHeaders(table, request); - builder.startArray(); + if (table.isTablePaginated()) { + assert table.getPaginatedElementForTable() != null : "Paginated element is required in-case nextToken is not null"; + builder.startObject(); + builder.field("nextToken", table.getNextTokenForTable()); + builder.startArray(table.getPaginatedElementForTable()); + } else { + builder.startArray(); + } + List rowOrder = getRowOrder(table, request); for (Integer row : rowOrder) { builder.startObject(); @@ -98,6 +106,11 @@ public static RestResponse buildXContentBuilder(Table table, RestChannel channel builder.endObject(); } builder.endArray(); + + if (table.isTablePaginated()) { + builder.endObject(); + } + return new BytesRestResponse(RestStatus.OK, builder); } @@ -136,6 +149,13 @@ public static RestResponse buildTextPlainResponse(Table table, RestChannel chann } out.append("\n"); } + + // Adding a nextToken row, post an empty line, in the response if the table is paginated. + if (table.isTablePaginated()) { + out.append("\n"); + out.append("nextToken" + " " + table.getNextTokenForTable()); + out.append("\n"); + } out.close(); return new BytesRestResponse(RestStatus.OK, BytesRestResponse.TEXT_CONTENT_TYPE, bytesOut.bytes()); } diff --git a/server/src/test/java/org/opensearch/rest/action/cat/RestIndicesActionTests.java b/server/src/test/java/org/opensearch/rest/action/cat/RestIndicesActionTests.java index 96b1c75371697..48f0b5e04fffe 100644 --- a/server/src/test/java/org/opensearch/rest/action/cat/RestIndicesActionTests.java +++ b/server/src/test/java/org/opensearch/rest/action/cat/RestIndicesActionTests.java @@ -138,7 +138,7 @@ public void testBuildTable() { } final RestIndicesAction action = new RestIndicesAction(); - final Table table = action.buildTable(new FakeRestRequest(), indicesSettings, indicesHealths, indicesStats, indicesMetadatas); + final Table table = action.buildTable(new FakeRestRequest(), indicesSettings, indicesHealths, indicesStats, indicesMetadatas, null); // now, verify the table is correct List headers = table.getHeaders();