Skip to content

Commit

Permalink
[Backport 2.x] Add cluster manager throttling task stats in nodes sta…
Browse files Browse the repository at this point in the history
…ts API (#5871)

* [Backport 2.x] Add throttling stats in nodes stats API

Signed-off-by: Dhwanil Patel <dhwanip@amazon.com>
  • Loading branch information
dhwanilpatel committed Jan 17, 2023
1 parent d3d0b40 commit 29bd1ef
Show file tree
Hide file tree
Showing 13 changed files with 177 additions and 9 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
## [Unreleased 2.x]

### Added
- Added cluster manager throttling stats in nodes/stats API ([#5790](https://github.com/opensearch-project/OpenSearch/pull/5790))
### Dependencies
- Update nebula-publishing-plugin to 19.2.0 ([#5704](https://github.com/opensearch-project/OpenSearch/pull/5704))
### Changed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.opensearch.action.support.nodes.BaseNodeResponse;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodeRole;
import org.opensearch.cluster.service.ClusterManagerThrottlingStats;
import org.opensearch.common.Nullable;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
Expand Down Expand Up @@ -123,6 +124,9 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment {
@Nullable
private SearchBackpressureStats searchBackpressureStats;

@Nullable
private ClusterManagerThrottlingStats clusterManagerThrottlingStats;

public NodeStats(StreamInput in) throws IOException {
super(in);
timestamp = in.readVLong();
Expand Down Expand Up @@ -165,6 +169,12 @@ public NodeStats(StreamInput in) throws IOException {
} else {
searchBackpressureStats = null;
}

if (in.getVersion().onOrAfter(Version.V_2_6_0)) {
clusterManagerThrottlingStats = in.readOptionalWriteable(ClusterManagerThrottlingStats::new);
} else {
clusterManagerThrottlingStats = null;
}
}

public NodeStats(
Expand All @@ -186,7 +196,8 @@ public NodeStats(
@Nullable ScriptCacheStats scriptCacheStats,
@Nullable IndexingPressureStats indexingPressureStats,
@Nullable ShardIndexingPressureStats shardIndexingPressureStats,
@Nullable SearchBackpressureStats searchBackpressureStats
@Nullable SearchBackpressureStats searchBackpressureStats,
@Nullable ClusterManagerThrottlingStats clusterManagerThrottlingStats
) {
super(node);
this.timestamp = timestamp;
Expand All @@ -207,6 +218,7 @@ public NodeStats(
this.indexingPressureStats = indexingPressureStats;
this.shardIndexingPressureStats = shardIndexingPressureStats;
this.searchBackpressureStats = searchBackpressureStats;
this.clusterManagerThrottlingStats = clusterManagerThrottlingStats;
}

public long getTimestamp() {
Expand Down Expand Up @@ -321,6 +333,11 @@ public SearchBackpressureStats getSearchBackpressureStats() {
return searchBackpressureStats;
}

@Nullable
public ClusterManagerThrottlingStats getClusterManagerThrottlingStats() {
return clusterManagerThrottlingStats;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
Expand Down Expand Up @@ -355,6 +372,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_2_4_0)) {
out.writeOptionalWriteable(searchBackpressureStats);
}
if (out.getVersion().onOrAfter(Version.V_2_6_0)) {
out.writeOptionalWriteable(clusterManagerThrottlingStats);
}
}

@Override
Expand Down Expand Up @@ -430,6 +450,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (getSearchBackpressureStats() != null) {
getSearchBackpressureStats().toXContent(builder, params);
}
if (getClusterManagerThrottlingStats() != null) {
getClusterManagerThrottlingStats().toXContent(builder, params);
}
return builder;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,8 @@ public enum Metric {
SCRIPT_CACHE("script_cache"),
INDEXING_PRESSURE("indexing_pressure"),
SHARD_INDEXING_PRESSURE("shard_indexing_pressure"),
SEARCH_BACKPRESSURE("search_backpressure");
SEARCH_BACKPRESSURE("search_backpressure"),
CLUSTER_MANAGER_THROTTLING("cluster_manager_throttling");

private String metricName;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,8 @@ protected NodeStats nodeOperation(NodeStatsRequest nodeStatsRequest) {
NodesStatsRequest.Metric.SCRIPT_CACHE.containedIn(metrics),
NodesStatsRequest.Metric.INDEXING_PRESSURE.containedIn(metrics),
NodesStatsRequest.Metric.SHARD_INDEXING_PRESSURE.containedIn(metrics),
NodesStatsRequest.Metric.SEARCH_BACKPRESSURE.containedIn(metrics)
NodesStatsRequest.Metric.SEARCH_BACKPRESSURE.containedIn(metrics),
NodesStatsRequest.Metric.CLUSTER_MANAGER_THROTTLING.containedIn(metrics)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq
false,
false,
false,
false,
false
);
List<ShardStats> shardsStats = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,29 @@

package org.opensearch.cluster.service;

import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.io.stream.Writeable;
import org.opensearch.common.metrics.CounterMetric;
import org.opensearch.common.xcontent.ToXContent;
import org.opensearch.common.xcontent.ToXContentFragment;
import org.opensearch.common.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* Contains stats of Cluster Manager Task Throttling.
* It stores the total cumulative count of throttled tasks per task type.
*/
public class ClusterManagerThrottlingStats implements ClusterManagerTaskThrottlerListener {
public class ClusterManagerThrottlingStats implements ClusterManagerTaskThrottlerListener, Writeable, ToXContentFragment {

private Map<String, CounterMetric> throttledTasksCount = new ConcurrentHashMap<>();
private Map<String, CounterMetric> throttledTasksCount;

public ClusterManagerThrottlingStats() {
throttledTasksCount = new ConcurrentHashMap<>();
}

private void incrementThrottlingCount(String type, final int counts) {
throttledTasksCount.computeIfAbsent(type, k -> new CounterMetric()).inc(counts);
Expand All @@ -39,4 +50,63 @@ public long getTotalThrottledTaskCount() {
public void onThrottle(String type, int counts) {
incrementThrottlingCount(type, counts);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(throttledTasksCount.size());
for (Map.Entry<String, CounterMetric> entry : throttledTasksCount.entrySet()) {
out.writeString(entry.getKey());
out.writeVInt((int) entry.getValue().count());
}
}

public ClusterManagerThrottlingStats(StreamInput in) throws IOException {
int throttledTaskEntries = in.readVInt();
throttledTasksCount = new ConcurrentHashMap<>();
for (int i = 0; i < throttledTaskEntries; i++) {
String taskType = in.readString();
int throttledTaskCount = in.readVInt();
onThrottle(taskType, throttledTaskCount);
}
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.startObject("cluster_manager_throttling");
builder.startObject("stats");
builder.field("total_throttled_tasks", getTotalThrottledTaskCount());
builder.startObject("throttled_tasks_per_task_type");
for (Map.Entry<String, CounterMetric> entry : throttledTasksCount.entrySet()) {
builder.field(entry.getKey(), entry.getValue().count());
}
builder.endObject();
builder.endObject();
return builder.endObject();
}

public boolean equals(Object o) {
if (this == o) {
return true;
} else if (o != null && this.getClass() == o.getClass()) {
ClusterManagerThrottlingStats that = (ClusterManagerThrottlingStats) o;

if (this.throttledTasksCount.size() == that.throttledTasksCount.size()) {
for (Map.Entry<String, CounterMetric> entry : this.throttledTasksCount.entrySet()) {
if (that.throttledTasksCount.get(entry.getKey()).count() != entry.getValue().count()) {
return false;
}
}
return true;
}
}
return false;
}

public int hashCode() {
Map<String, Long> countMap = new ConcurrentHashMap<>();
for (Map.Entry<String, CounterMetric> entry : this.throttledTasksCount.entrySet()) {
countMap.put(entry.getKey(), entry.getValue().count());
}
return countMap.hashCode();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,13 @@ public long numberOfThrottledPendingTasks() {
return throttlingStats.getTotalThrottledTaskCount();
}

/**
* Returns the stats of throttled pending tasks.
*/
public ClusterManagerThrottlingStats getThrottlingStats() {
return throttlingStats;
}

/**
* Returns the min version of nodes in cluster
*/
Expand Down
8 changes: 6 additions & 2 deletions server/src/main/java/org/opensearch/node/NodeService.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ public class NodeService implements Closeable {
private final IndexingPressureService indexingPressureService;
private final AggregationUsageService aggregationUsageService;
private final SearchBackpressureService searchBackpressureService;
private final ClusterService clusterService;

private final Discovery discovery;

Expand Down Expand Up @@ -123,6 +124,7 @@ public class NodeService implements Closeable {
this.indexingPressureService = indexingPressureService;
this.aggregationUsageService = aggregationUsageService;
this.searchBackpressureService = searchBackpressureService;
this.clusterService = clusterService;
clusterService.addStateApplier(ingestService);
}

Expand Down Expand Up @@ -174,7 +176,8 @@ public NodeStats stats(
boolean scriptCache,
boolean indexingPressure,
boolean shardIndexingPressure,
boolean searchBackpressure
boolean searchBackpressure,
boolean clusterManagerThrottling
) {
// for indices stats we want to include previous allocated shards stats as well (it will
// only be applied to the sensible ones to use, like refresh/merge/flush/indexing stats)
Expand All @@ -197,7 +200,8 @@ public NodeStats stats(
scriptCache ? scriptService.cacheStats() : null,
indexingPressure ? this.indexingPressureService.nodeStats() : null,
shardIndexingPressure ? this.indexingPressureService.shardStats(indices) : null,
searchBackpressure ? this.searchBackpressureService.nodeStats() : null
searchBackpressure ? this.searchBackpressureService.nodeStats() : null,
clusterManagerThrottling ? this.clusterService.getClusterManagerService().getThrottlingStats() : null
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
package org.opensearch.action.admin.cluster.node.stats;

import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.service.ClusterManagerThrottlingStats;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.discovery.DiscoveryStats;
Expand Down Expand Up @@ -418,6 +419,21 @@ public void testSerialization() throws IOException {
assertEquals(limited, sum.getCompilationLimitTriggered());
assertEquals(compilations, sum.getCompilations());
}
ClusterManagerThrottlingStats clusterManagerThrottlingStats = nodeStats.getClusterManagerThrottlingStats();
ClusterManagerThrottlingStats deserializedClusterManagerThrottlingStats = deserializedNodeStats
.getClusterManagerThrottlingStats();
if (clusterManagerThrottlingStats == null) {
assertNull(deserializedClusterManagerThrottlingStats);
} else {
assertEquals(
clusterManagerThrottlingStats.getTotalThrottledTaskCount(),
deserializedClusterManagerThrottlingStats.getTotalThrottledTaskCount()
);
assertEquals(
clusterManagerThrottlingStats.getThrottlingCount("test-task"),
deserializedClusterManagerThrottlingStats.getThrottlingCount("test-task")
);
}
}
}
}
Expand Down Expand Up @@ -689,6 +705,11 @@ public static NodeStats createNodeStats() {
}
adaptiveSelectionStats = new AdaptiveSelectionStats(nodeConnections, nodeStats);
}
ClusterManagerThrottlingStats clusterManagerThrottlingStats = null;
if (frequently()) {
clusterManagerThrottlingStats = new ClusterManagerThrottlingStats();
clusterManagerThrottlingStats.onThrottle("test-task", randomInt());
}
ScriptCacheStats scriptCacheStats = scriptStats != null ? scriptStats.toScriptCacheStats() : null;
// TODO NodeIndicesStats are not tested here, way too complicated to create, also they need to be migrated to Writeable yet
return new NodeStats(
Expand All @@ -710,7 +731,8 @@ public static NodeStats createNodeStats() {
scriptCacheStats,
null,
null,
null
null,
clusterManagerThrottlingStats
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ public void testFillDiskUsage() {
null,
null,
null,
null,
null
),
new NodeStats(
Expand All @@ -207,6 +208,7 @@ public void testFillDiskUsage() {
null,
null,
null,
null,
null
),
new NodeStats(
Expand All @@ -228,6 +230,7 @@ public void testFillDiskUsage() {
null,
null,
null,
null,
null
)
);
Expand Down Expand Up @@ -280,6 +283,7 @@ public void testFillDiskUsageSomeInvalidValues() {
null,
null,
null,
null,
null
),
new NodeStats(
Expand All @@ -301,6 +305,7 @@ public void testFillDiskUsageSomeInvalidValues() {
null,
null,
null,
null,
null
),
new NodeStats(
Expand All @@ -322,6 +327,7 @@ public void testFillDiskUsageSomeInvalidValues() {
null,
null,
null,
null,
null
)
);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.service;

import org.opensearch.common.io.stream.Writeable;
import org.opensearch.test.AbstractWireSerializingTestCase;

public class ClusterManagerThrottlingStatsTests extends AbstractWireSerializingTestCase<ClusterManagerThrottlingStats> {
@Override
protected Writeable.Reader<ClusterManagerThrottlingStats> instanceReader() {
return ClusterManagerThrottlingStats::new;
}

@Override
protected ClusterManagerThrottlingStats createTestInstance() {
return randomInstance();
}

public static ClusterManagerThrottlingStats randomInstance() {
ClusterManagerThrottlingStats randomStats = new ClusterManagerThrottlingStats();
randomStats.onThrottle(randomAlphaOfLengthBetween(3, 10), randomInt());
return randomStats;
}
}
Loading

0 comments on commit 29bd1ef

Please sign in to comment.