diff --git a/CHANGELOG.md b/CHANGELOG.md index a5355f010a99f..bd7101db17681 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Changed - Add lower limit for primary and replica batch allocators timeout ([#14979](https://github.com/opensearch-project/OpenSearch/pull/14979)) +- Optimize _cat/nodes api by sending request to each node and receiving response separately ([#14853](https://github.com/opensearch-project/OpenSearch/pull/14853)) ### Deprecated diff --git a/qa/smoke-test-http/src/test/java/org/opensearch/http/HttpCatIT.java b/qa/smoke-test-http/src/test/java/org/opensearch/http/HttpCatIT.java new file mode 100644 index 0000000000000..35e49276762b2 --- /dev/null +++ b/qa/smoke-test-http/src/test/java/org/opensearch/http/HttpCatIT.java @@ -0,0 +1,58 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.http; + +import org.apache.hc.core5.http.ParseException; +import org.apache.hc.core5.http.io.entity.EntityUtils; +import org.opensearch.client.Request; +import org.opensearch.client.Response; +import org.opensearch.client.ResponseException; +import org.opensearch.client.RestClient; +import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope; +import org.opensearch.test.OpenSearchIntegTestCase.Scope; + +import java.io.IOException; + +import static org.apache.hc.core5.http.HttpStatus.SC_OK; +import static org.hamcrest.Matchers.containsString; + +@ClusterScope(scope = Scope.SUITE, supportsDedicatedMasters = false, numDataNodes = 5, numClientNodes = 0) +public class HttpCatIT extends HttpSmokeTestCase { + + public void testdoCatRequest() throws IOException, ParseException { + try (RestClient restClient = getRestClient()) { + int nodesCount = restClient.getNodes().size(); + assertEquals(5, nodesCount); + + for (int i = 0; i < 2; i++) { + Request nodesRequest = new Request("GET", "/_cat/nodes"); + Response response = restClient.performRequest(nodesRequest); + assertEquals(SC_OK, response.getStatusLine().getStatusCode()); + String result = EntityUtils.toString(response.getEntity()); + String[] NodeInfos = result.split("\n"); + assertEquals(nodesCount, NodeInfos.length); + } + + for (int i = 1; i < 1500; i+= 50) { + Request nodesRequest = new Request("GET", "/_cat/nodes?timeout=" + i + "ms"); + try { + Response response = restClient.performRequest(nodesRequest); + assertEquals(SC_OK, response.getStatusLine().getStatusCode()); + String result = EntityUtils.toString(response.getEntity()); + String[] NodeInfos = result.split("\n"); + assertEquals(nodesCount, NodeInfos.length); + } catch (ResponseException e) { + // it means that it costs too long to get ClusterState from the master. + assertThat(e.getMessage(), containsString("There is not enough time to obtain nodesInfo metric from the cluster manager")); + } + } + } + } + +} diff --git a/server/src/main/java/org/opensearch/rest/action/cat/RestNodesAction.java b/server/src/main/java/org/opensearch/rest/action/cat/RestNodesAction.java index 0330fe627ccd0..6eb3aa5bc2037 100644 --- a/server/src/main/java/org/opensearch/rest/action/cat/RestNodesAction.java +++ b/server/src/main/java/org/opensearch/rest/action/cat/RestNodesAction.java @@ -32,6 +32,8 @@ package org.opensearch.rest.action.cat; +import org.opensearch.OpenSearchTimeoutException; +import org.opensearch.action.FailedNodeException; import org.opensearch.action.admin.cluster.node.info.NodeInfo; import org.opensearch.action.admin.cluster.node.info.NodesInfoRequest; import org.opensearch.action.admin.cluster.node.info.NodesInfoResponse; @@ -47,6 +49,8 @@ import org.opensearch.common.Table; import org.opensearch.common.logging.DeprecationLogger; import org.opensearch.common.network.NetworkAddress; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.Strings; import org.opensearch.core.common.transport.TransportAddress; import org.opensearch.core.common.unit.ByteSizeValue; @@ -68,15 +72,20 @@ import org.opensearch.monitor.os.OsStats; import org.opensearch.monitor.process.ProcessInfo; import org.opensearch.monitor.process.ProcessStats; +import org.opensearch.rest.RestChannel; import org.opensearch.rest.RestRequest; -import org.opensearch.rest.RestResponse; import org.opensearch.rest.action.RestActionListener; -import org.opensearch.rest.action.RestResponseListener; import org.opensearch.script.ScriptStats; import org.opensearch.search.suggest.completion.CompletionStats; +import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Locale; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; import java.util.stream.Collectors; import static java.util.Collections.singletonList; @@ -88,6 +97,7 @@ * @opensearch.api */ public class RestNodesAction extends AbstractCatAction { + public static final long TIMEOUT_THRESHOLD_NANO = 5_000_000; private static final DeprecationLogger deprecationLogger = DeprecationLogger.getLogger(RestNodesAction.class); static final String LOCAL_DEPRECATED_MESSAGE = "Deprecated parameter [local] used. This parameter does not cause this API to act " + "locally, and should not be used. It will be unsupported in version 8.0."; @@ -120,49 +130,177 @@ public RestChannelConsumer doCatRequest(final RestRequest request, final NodeCli ); parseDeprecatedMasterTimeoutParameter(clusterStateRequest, request, deprecationLogger, getName()); final boolean fullId = request.paramAsBoolean("full_id", false); - return channel -> client.admin().cluster().state(clusterStateRequest, new RestActionListener(channel) { + final long beginTimeNano = System.nanoTime(); + final long timeoutNano = request.hasParam("timeout") + ? TimeValue.parseTimeValue(request.param("timeout"), "timeout").nanos() + : Long.MAX_VALUE; + return channel -> client.admin().cluster().state(clusterStateRequest, new RestActionListener<>(channel) { @Override public void processResponse(final ClusterStateResponse clusterStateResponse) { - NodesInfoRequest nodesInfoRequest = new NodesInfoRequest(); - nodesInfoRequest.timeout(request.param("timeout")); - nodesInfoRequest.setIncludeDiscoveryNodes(false); - nodesInfoRequest.clear() - .addMetrics( - NodesInfoRequest.Metric.JVM.metricName(), - NodesInfoRequest.Metric.OS.metricName(), - NodesInfoRequest.Metric.PROCESS.metricName(), - NodesInfoRequest.Metric.HTTP.metricName() + long leftTimeNano = timeoutNano - System.nanoTime() + beginTimeNano; + if (leftTimeNano < TIMEOUT_THRESHOLD_NANO) { + onFailure( + new OpenSearchTimeoutException( + "There is not enough time to obtain nodesInfo metric from the cluster manager:" + + clusterStateResponse.getState().nodes().getMasterNode().getName() + ) ); - client.admin().cluster().nodesInfo(nodesInfoRequest, new RestActionListener(channel) { - @Override - public void processResponse(final NodesInfoResponse nodesInfoResponse) { - NodesStatsRequest nodesStatsRequest = new NodesStatsRequest(); - nodesStatsRequest.timeout(request.param("timeout")); - nodesStatsRequest.setIncludeDiscoveryNodes(false); - nodesStatsRequest.clear() - .indices(true) - .addMetrics( - NodesStatsRequest.Metric.JVM.metricName(), - NodesStatsRequest.Metric.OS.metricName(), - NodesStatsRequest.Metric.FS.metricName(), - NodesStatsRequest.Metric.PROCESS.metricName(), - NodesStatsRequest.Metric.SCRIPT.metricName() + return; + } + String[] nodeIds = clusterStateResponse.getState().nodes().resolveNodes(); + ConcurrentMap successNodeInfos = new ConcurrentHashMap<>(nodeIds.length); + ConcurrentMap failNodeInfos = new ConcurrentHashMap<>(nodeIds.length); + ConcurrentMap successNodeStats = new ConcurrentHashMap<>(nodeIds.length); + ConcurrentMap failNodeStats = new ConcurrentHashMap<>(nodeIds.length); + AtomicInteger counter = new AtomicInteger(); + for (String nodeId : nodeIds) { + NodesInfoRequest nodesInfoRequest = createNodesInfoRequest(timeoutNano, leftTimeNano, nodeId); + nodesInfoRequest.setIncludeDiscoveryNodes(false); + client.admin().cluster().nodesInfo(nodesInfoRequest, new ActionListener<>() { + @Override + public void onResponse(NodesInfoResponse nodesInfoResponse) { + assert nodesInfoResponse.getNodes().size() + nodesInfoResponse.failures().size() == 1; + NodesStatsRequest nodesStatsRequest = checkAndCreateNodesStatsRequest( + nodesInfoResponse.failures(), + timeoutNano, + beginTimeNano, + nodeId, + this::onFailure, + clusterStateResponse.getState().nodes().get(nodeId).getName() ); - client.admin().cluster().nodesStats(nodesStatsRequest, new RestResponseListener(channel) { - @Override - public RestResponse buildResponse(NodesStatsResponse nodesStatsResponse) throws Exception { - return RestTable.buildResponse( - buildTable(fullId, request, clusterStateResponse, nodesInfoResponse, nodesStatsResponse), - channel - ); + if (nodesStatsRequest == null) { + return; } - }); - } - }); + successNodeInfos.put(nodeId, nodesInfoResponse.getNodes().get(0)); + client.admin().cluster().nodesStats(nodesStatsRequest, ActionListener.runAfter(new ActionListener<>() { + @Override + public void onResponse(NodesStatsResponse nodesStatsResponse) { + assert nodesStatsResponse.getNodes().size() + nodesStatsResponse.failures().size() == 1; + if (nodesStatsResponse.getNodes().size() == 1) { + successNodeStats.put(nodeId, nodesStatsResponse.getNodes().get(0)); + } else { + failNodeStats.put(nodeId, nodesStatsResponse.failures().get(0)); + } + } + + @Override + public void onFailure(Exception e) { + assert e instanceof FailedNodeException; + failNodeStats.put(nodeId, (FailedNodeException) e); + } + }, this::onOperation)); + } + + @Override + public void onFailure(Exception e) { + assert e instanceof FailedNodeException; + failNodeInfos.put(nodeId, (FailedNodeException) e); + onOperation(); + } + + private void onOperation() { + if (counter.incrementAndGet() == nodeIds.length) { + try { + sendResponse( + channel, + clusterStateResponse, + request, + fullId, + successNodeInfos.values(), + failNodeInfos.values(), + successNodeStats.values(), + failNodeStats.values() + ); + } catch (Exception e) { + e.addSuppressed(e); + logger.error("failed to send failure response", e); + } + } + } + }); + } } }); } + private NodesInfoRequest createNodesInfoRequest(long timeoutNano, long leftTimeNano, String nodeId) { + NodesInfoRequest nodesInfoRequest = new NodesInfoRequest(); + if (timeoutNano != Long.MAX_VALUE) { + nodesInfoRequest.timeout(TimeValue.timeValueNanos(leftTimeNano)); + } + nodesInfoRequest.clear() + .nodesIds(nodeId) + .addMetrics( + NodesInfoRequest.Metric.JVM.metricName(), + NodesInfoRequest.Metric.OS.metricName(), + NodesInfoRequest.Metric.PROCESS.metricName(), + NodesInfoRequest.Metric.HTTP.metricName() + ); + return nodesInfoRequest; + } + + private NodesStatsRequest checkAndCreateNodesStatsRequest( + List failedNodeExceptions, + long timeoutNano, + long beginTimeNano, + String nodeId, + Consumer failedConsumer, + String nodeName + ) { + if (failedNodeExceptions.isEmpty() == false) { + failedConsumer.accept(failedNodeExceptions.get(0)); + return null; + } + long leftTime = timeoutNano - System.nanoTime() + beginTimeNano; + if (leftTime < TIMEOUT_THRESHOLD_NANO) { + failedConsumer.accept( + new FailedNodeException(nodeId, "There is not enough time to obtain nodesStats metric from " + nodeName, null) + ); + return null; + } + NodesStatsRequest nodesStatsRequest = new NodesStatsRequest(); + if (timeoutNano != Long.MAX_VALUE) { + nodesStatsRequest.timeout(TimeValue.timeValueMillis(leftTime)); + } + nodesStatsRequest.setIncludeDiscoveryNodes(false); + nodesStatsRequest.clear() + .nodesIds(nodeId) + .indices(true) + .addMetrics( + NodesStatsRequest.Metric.JVM.metricName(), + NodesStatsRequest.Metric.OS.metricName(), + NodesStatsRequest.Metric.FS.metricName(), + NodesStatsRequest.Metric.PROCESS.metricName(), + NodesStatsRequest.Metric.SCRIPT.metricName() + ); + return nodesStatsRequest; + } + + private void sendResponse( + RestChannel channel, + ClusterStateResponse clusterStateResponse, + RestRequest request, + boolean fullId, + Collection successNodeInfos, + Collection failNodeInfos, + Collection successNodeStats, + Collection failNodeStats + ) throws Exception { + NodesInfoResponse nodesInfoResponse = new NodesInfoResponse( + clusterStateResponse.getClusterName(), + new ArrayList<>(successNodeInfos), + new ArrayList<>(failNodeInfos) + ); + NodesStatsResponse nodesStatsResponse = new NodesStatsResponse( + clusterStateResponse.getClusterName(), + new ArrayList<>(successNodeStats), + new ArrayList<>(failNodeStats) + ); + channel.sendResponse( + RestTable.buildResponse(buildTable(fullId, request, clusterStateResponse, nodesInfoResponse, nodesStatsResponse), channel) + ); + } + @Override protected Table getTableWithHeader(final RestRequest request) { Table table = new Table();