Skip to content

Commit

Permalink
Implementing pagination for _cat/indices API
Browse files Browse the repository at this point in the history
Signed-off-by: Harsh Garg <gkharsh@amazon.com>
  • Loading branch information
Harsh Garg committed Jul 10, 2024
1 parent f14b5c8 commit 4924a71
Show file tree
Hide file tree
Showing 5 changed files with 302 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,16 @@ public static IndicesOptions fromRequest(RestRequest request, IndicesOptions def
);
}

public static IndicesOptions fromPaginatedRequest(RestRequest request, IndicesOptions defaultSettings) {
return fromParameters(
null,
request.param("ignore_unavailable"),
request.param("allow_no_indices"),
request.param("ignore_throttled"),
defaultSettings
);
}

public static IndicesOptions fromMap(Map<String, Object> map, IndicesOptions defaultSettings) {
return fromParameters(
map.containsKey("expand_wildcards") ? map.get("expand_wildcards") : map.get("expandWildcards"),
Expand Down
54 changes: 54 additions & 0 deletions server/src/main/java/org/opensearch/common/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
import java.util.Map;
import java.util.concurrent.TimeUnit;

import reactor.util.annotation.NonNull;

import static java.util.Collections.emptyMap;

/**
Expand All @@ -59,9 +61,19 @@ public class Table {
private List<Cell> currentCells;
private boolean inHeaders = false;
private boolean withTime = false;
private PaginationMetadata paginationMetadata = new PaginationMetadata(false, null, null);

public static final String EPOCH = "epoch";
public static final String TIMESTAMP = "timestamp";

public Table() {}

public Table(@Nullable PaginationMetadata paginationMetadata) {
if (paginationMetadata != null) {
this.paginationMetadata = paginationMetadata;
}
}

public Table startHeaders() {
inHeaders = true;
currentCells = new ArrayList<>();
Expand Down Expand Up @@ -230,6 +242,18 @@ public Map<String, String> getAliasMap() {
return headerAliasMap;
}

public boolean isTablePaginated() {
return paginationMetadata.isResponsePaginated;
}

public String getNextTokenForTable() {
return paginationMetadata.nextToken;
}

public String getPaginatedElementForTable() {
return paginationMetadata.paginatedElement;
}

/**
* Cell in a table
*
Expand All @@ -254,4 +278,34 @@ public Cell(Object value, Map<String, String> attr) {
this.attr = attr;
}
}

/**
* Pagination metadata for a table.
*
* @opensearch.internal
*/
public static class PaginationMetadata {

/**
* boolean denoting whether the table is paginated or not.
*/
public final boolean isResponsePaginated;

/**
* String denoting the element which is being paginated (for e.g. shards, indices..).
*/
public final String paginatedElement;

/**
* String denoting the nextToken of paginated response, which will be used to fetch nextPage (if any).
*/
public final String nextToken;

public PaginationMetadata(@NonNull boolean isResponsePaginated, @Nullable String paginatedElement, @Nullable String nextToken) {
this.isResponsePaginated = isResponsePaginated;
assert !isResponsePaginated || paginatedElement != null : "paginatedElement must be specified for a table which is paginated";
this.paginatedElement = paginatedElement;
this.nextToken = nextToken;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,21 +62,26 @@
import org.opensearch.rest.RestResponse;
import org.opensearch.rest.action.RestResponseListener;

import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.Spliterators;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Arrays.asList;
import static java.util.Collections.unmodifiableList;
import static org.opensearch.action.support.clustermanager.ClusterManagerNodeRequest.DEFAULT_CLUSTER_MANAGER_NODE_TIMEOUT;
Expand All @@ -95,6 +100,9 @@ public class RestIndicesAction extends AbstractCatAction {
"Parameter [master_timeout] is deprecated and will be removed in 3.0. To support inclusive language, please use [cluster_manager_timeout] instead.";
private static final String DUPLICATE_PARAMETER_ERROR_MESSAGE =
"Please only use one of the request parameters [master_timeout, cluster_manager_timeout].";
private static final String INCORRECT_TAINTED_NEXT_TOKEN_ERROR_MESSAGE =
"Parameter [nextToken] has been tainted and is incorrect. Please provide a valid [nextToken].";
private static final String DEFAULT_CAT_INDICES_PAGE_SIZE_STRING = "100";

@Override
public List<Route> routes() {
Expand All @@ -119,6 +127,9 @@ protected void documentation(StringBuilder sb) {

@Override
public RestChannelConsumer doCatRequest(final RestRequest request, final NodeClient client) {
if (request.hasParam("nextToken")) {
return doPaginatedCatRequest(request, client);
}
final String[] indices = Strings.splitStringByCommaToArray(request.param("index"));
final IndicesOptions indicesOptions = IndicesOptions.fromRequest(request, IndicesOptions.strictExpand());
final boolean local = request.paramAsBoolean("local", false);
Expand Down Expand Up @@ -205,6 +216,113 @@ public void onFailure(final Exception e) {
};
}

public RestChannelConsumer doPaginatedCatRequest(final RestRequest request, final NodeClient client) {
// Paginated query will not read "indices" and "expand_wildcard" query parameters, and hence if specified with nextToken
// will result in "illegal_argument_exception".
TimeValue clusterManagerTimeout = request.paramAsTime("cluster_manager_timeout", DEFAULT_CLUSTER_MANAGER_NODE_TIMEOUT);
// Remove the if condition and statements inside after removing MASTER_ROLE.
if (request.hasParam("master_timeout")) {
deprecationLogger.deprecate("cat_indices_master_timeout_parameter", MASTER_TIMEOUT_DEPRECATED_MESSAGE);
if (request.hasParam("cluster_manager_timeout")) {
throw new OpenSearchParseException(DUPLICATE_PARAMETER_ERROR_MESSAGE);
}
clusterManagerTimeout = request.paramAsTime("master_timeout", DEFAULT_CLUSTER_MANAGER_NODE_TIMEOUT);
}
final boolean local = request.paramAsBoolean("local", false);
final TimeValue clusterManagerNodeTimeout = clusterManagerTimeout;
final boolean includeUnloadedSegments = request.paramAsBoolean("include_unloaded_segments", false);
// Get the nextToken and pageSize provided in the request
final String nextTokenInRequest = validateAndGetNextTokenFromRequest(request);
final int pageSize = Integer.parseInt(request.param("pageSize", DEFAULT_CAT_INDICES_PAGE_SIZE_STRING));

return channel -> {
final ActionListener<Table> listener = ActionListener.notifyOnce(new RestResponseListener<Table>(channel) {
@Override
public RestResponse buildResponse(final Table table) throws Exception {
return RestTable.buildResponse(table, channel);
}
});

// Fetch all the indices from clusterStateRequest for a paginated query.
sendClusterStateRequest(
new String[0],
IndicesOptions.lenientExpandHidden(),
local,
clusterManagerNodeTimeout,
client,
new ActionListener<ClusterStateResponse>() {
@Override
public void onResponse(final ClusterStateResponse clusterStateResponse) {
// Get a sorted list of indices from metadata and filter out the required number of indices
List<String> sortedIndicesList = getListOfIndicesSortedByCreateTime(clusterStateResponse);
final int newPageStartIndexNumber = getNewPageIndexStartNumber(
nextTokenInRequest,
sortedIndicesList,
clusterStateResponse
); // inclusive
int newPageEndIndexNumber = Math.min(newPageStartIndexNumber + pageSize, sortedIndicesList.size()); // exclusive
String[] indicesToBeQueried = sortedIndicesList.subList(newPageStartIndexNumber, newPageEndIndexNumber)
.toArray(new String[0]);
// Generate the new nextToken which is to be passed in the response
// NextToken = "IndexNumberToStartTheNextPageFrom + $ + CreationTimeOfLastRespondedIndex + $ +
// NameOfLastRespondedIndex" -> (1$12345678$testIndex)
final String nextToken = newPageEndIndexNumber >= sortedIndicesList.size()
? null
: Base64.getEncoder()
.encodeToString(
(newPageEndIndexNumber
+ "$"
+ clusterStateResponse.getState()
.metadata()
.indices()
.get(sortedIndicesList.get(newPageEndIndexNumber - 1))
.getCreationDate()
+ "$"
+ sortedIndicesList.get(newPageEndIndexNumber - 1)).getBytes(StandardCharsets.UTF_8)
);

final GroupedActionListener<ActionResponse> groupedListener = createGroupedListener(
request,
4,
listener,
new Table.PaginationMetadata(true, "indices", nextToken)
);
groupedListener.onResponse(clusterStateResponse);

sendGetSettingsRequest(
indicesToBeQueried,
IndicesOptions.fromPaginatedRequest(request, IndicesOptions.strictExpand()),
local,
clusterManagerNodeTimeout,
client,
ActionListener.wrap(groupedListener::onResponse, groupedListener::onFailure)
);
sendIndicesStatsRequest(
indicesToBeQueried,
IndicesOptions.lenientExpandHidden(),
includeUnloadedSegments,
client,
ActionListener.wrap(groupedListener::onResponse, groupedListener::onFailure)
);
sendClusterHealthRequest(
indicesToBeQueried,
IndicesOptions.lenientExpandHidden(),
local,
clusterManagerNodeTimeout,
client,
ActionListener.wrap(groupedListener::onResponse, groupedListener::onFailure)
);
}

@Override
public void onFailure(final Exception e) {
listener.onFailure(e);
}
}
);
};
}

/**
* We're using the Get Settings API here to resolve the authorized indices for the user.
* This is because the Cluster State and Cluster Health APIs do not filter output based
Expand Down Expand Up @@ -288,6 +406,15 @@ private GroupedActionListener<ActionResponse> createGroupedListener(
final RestRequest request,
final int size,
final ActionListener<Table> listener
) {
return createGroupedListener(request, size, listener, null);
}

private GroupedActionListener<ActionResponse> createGroupedListener(
final RestRequest request,
final int size,
final ActionListener<Table> listener,
final Table.PaginationMetadata paginationMetadata
) {
return new GroupedActionListener<>(new ActionListener<Collection<ActionResponse>>() {
@Override
Expand All @@ -311,7 +438,14 @@ public void onResponse(final Collection<ActionResponse> responses) {
IndicesStatsResponse statsResponse = extractResponse(responses, IndicesStatsResponse.class);
Map<String, IndexStats> indicesStats = statsResponse.getIndices();

Table responseTable = buildTable(request, indicesSettings, indicesHealths, indicesStats, indicesStates);
Table responseTable = buildTable(
request,
indicesSettings,
indicesHealths,
indicesStats,
indicesStates,
paginationMetadata
);
listener.onResponse(responseTable);
} catch (Exception e) {
onFailure(e);
Expand Down Expand Up @@ -340,7 +474,11 @@ protected Set<String> responseParams() {

@Override
protected Table getTableWithHeader(final RestRequest request) {
Table table = new Table();
return getTableWithHeader(request, null);
}

protected Table getTableWithHeader(final RestRequest request, final Table.PaginationMetadata paginationMetadata) {
Table table = new Table(paginationMetadata);
table.startHeaders();
table.addCell("health", "alias:h;desc:current health status");
table.addCell("status", "alias:s;desc:open/close status");
Expand Down Expand Up @@ -709,11 +847,12 @@ Table buildTable(
final Map<String, Settings> indicesSettings,
final Map<String, ClusterIndexHealth> indicesHealths,
final Map<String, IndexStats> indicesStats,
final Map<String, IndexMetadata> indicesMetadatas
final Map<String, IndexMetadata> indicesMetadatas,
final Table.PaginationMetadata paginationMetadata
) {

final String healthParam = request.param("health");
final Table table = getTableWithHeader(request);
final Table table = getTableWithHeader(request, paginationMetadata);

indicesSettings.forEach((indexName, settings) -> {
if (indicesMetadatas.containsKey(indexName) == false) {
Expand Down Expand Up @@ -991,4 +1130,77 @@ Table buildTable(
private static <A extends ActionResponse> A extractResponse(final Collection<? extends ActionResponse> responses, Class<A> c) {
return (A) responses.stream().filter(c::isInstance).findFirst().get();
}

private List<String> getListOfIndicesSortedByCreateTime(final ClusterStateResponse clusterStateResponse) {
List<String> indicesList = new ArrayList<String>(clusterStateResponse.getState().getRoutingTable().getIndicesRouting().keySet());
indicesList.sort((index1, index2) -> {
Long index1CreationTimeStamp = clusterStateResponse.getState().metadata().indices().get(index1).getCreationDate();
Long index2CreationTimeStamp = clusterStateResponse.getState().metadata().indices().get(index2).getCreationDate();
if (index1CreationTimeStamp.equals(index2CreationTimeStamp)) {
return index1.compareTo(index2);
}
return (index1CreationTimeStamp - index2CreationTimeStamp) > 0 ? 1 : -1;
});
return indicesList;
}

private String validateAndGetNextTokenFromRequest(RestRequest request) {
String nextTokenInRequest;
try {
nextTokenInRequest = Objects.equals(request.param("nextToken"), "null")
? null
: new String(Base64.getDecoder().decode(request.param("nextToken")), UTF_8);
} catch (IllegalArgumentException exception) {
throw new OpenSearchParseException(INCORRECT_TAINTED_NEXT_TOKEN_ERROR_MESSAGE);
}

if (Objects.isNull(nextTokenInRequest)) {
return null;
}
final String[] nextTokenElements = nextTokenInRequest.split("\\$");
if (nextTokenElements.length != 3) {
throw new OpenSearchParseException(INCORRECT_TAINTED_NEXT_TOKEN_ERROR_MESSAGE);
}

try {
int newPageStartIndexNumber = Integer.parseInt(nextTokenElements[0]);
long lastIndexCreationTime = Long.parseLong(nextTokenElements[1]);
if (newPageStartIndexNumber < 0 || lastIndexCreationTime < 0) {
throw new OpenSearchParseException(INCORRECT_TAINTED_NEXT_TOKEN_ERROR_MESSAGE);
}
} catch (NumberFormatException exception) {
throw new OpenSearchParseException(INCORRECT_TAINTED_NEXT_TOKEN_ERROR_MESSAGE);
}
return nextTokenInRequest;
}

private int getNewPageIndexStartNumber(
final String nextTokenInRequest,
final List<String> sortedIndicesList,
final ClusterStateResponse clusterStateResponse
) {
if (Objects.isNull(nextTokenInRequest)) {
return 0;
}
final String[] nextTokenElements = nextTokenInRequest.split("\\$");
int newPageStartIndexNumber = Math.min(Integer.parseInt(nextTokenElements[0]), sortedIndicesList.size());
long lastIndexCreationTime = Long.parseLong(nextTokenElements[1]);
String lastIndexName = nextTokenElements[2];
if (newPageStartIndexNumber > 0 && !Objects.equals(sortedIndicesList.get(newPageStartIndexNumber - 1), lastIndexName)) {
// case denoting an already responded index has been deleted while the paginated queries are being executed
// find the index whose creation time is just after the index which was last responded
newPageStartIndexNumber--;
while (newPageStartIndexNumber > 0) {
if (clusterStateResponse.getState()
.metadata()
.indices()
.get(sortedIndicesList.get(newPageStartIndexNumber - 1))
.getCreationDate() < lastIndexCreationTime) {
break;
}
newPageStartIndexNumber--;
}
}
return newPageStartIndexNumber;
}
}
Loading

0 comments on commit 4924a71

Please sign in to comment.