diff --git a/CHANGELOG.md b/CHANGELOG.md index 60ab1c09ac8b7..3ee7fb8103228 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Support for returning scores in matched queries ([#11626](https://github.com/opensearch-project/OpenSearch/pull/11626)) - Add shard id property to SearchLookup for use in field types provided by plugins ([#1063](https://github.com/opensearch-project/OpenSearch/pull/1063)) - Add kuromoji_completion analyzer and filter ([#4835](https://github.com/opensearch-project/OpenSearch/issues/4835)) +- [Admission Control] Integrate IO Usage Tracker to the Resource Usage Collector Service and Emit IO Usage Stats ([#11880](https://github.com/opensearch-project/OpenSearch/pull/11880)) ### Dependencies - Bump `com.squareup.okio:okio` from 3.7.0 to 3.8.0 ([#12290](https://github.com/opensearch-project/OpenSearch/pull/12290)) diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index e79112750f69d..cda99490937b7 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -669,6 +669,7 @@ public void apply(Settings value, Settings current, Settings previous) { // Settings related to resource trackers ResourceTrackerSettings.GLOBAL_CPU_USAGE_AC_WINDOW_DURATION_SETTING, ResourceTrackerSettings.GLOBAL_JVM_USAGE_AC_WINDOW_DURATION_SETTING, + ResourceTrackerSettings.GLOBAL_IO_USAGE_AC_WINDOW_DURATION_SETTING, // Settings related to Searchable Snapshots Node.NODE_SEARCH_CACHE_SIZE_SETTING, diff --git a/server/src/main/java/org/opensearch/monitor/fs/FsInfo.java b/server/src/main/java/org/opensearch/monitor/fs/FsInfo.java index 9ea803aa58f5a..e8f2391354604 100644 --- a/server/src/main/java/org/opensearch/monitor/fs/FsInfo.java +++ b/server/src/main/java/org/opensearch/monitor/fs/FsInfo.java @@ -450,9 +450,13 @@ public long ioTimeInMillis() { return (currentIOTime - previousIOTime); } + public String getDeviceName() { + return deviceName; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.field("device_name", deviceName); + builder.field("device_name", getDeviceName()); builder.field(IoStats.OPERATIONS, operations()); builder.field(IoStats.READ_OPERATIONS, readOperations()); builder.field(IoStats.WRITE_OPERATIONS, writeOperations()); diff --git a/server/src/main/java/org/opensearch/node/IoUsageStats.java b/server/src/main/java/org/opensearch/node/IoUsageStats.java new file mode 100644 index 0000000000000..ecb1ac1bb1de4 --- /dev/null +++ b/server/src/main/java/org/opensearch/node/IoUsageStats.java @@ -0,0 +1,69 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.node; + +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.xcontent.ToXContentFragment; +import org.opensearch.core.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.Locale; + +/** + * This class is to store tne IO Usage Stats and used to return in node stats API. + */ +public class IoUsageStats implements Writeable, ToXContentFragment { + + private double ioUtilisationPercent; + + public IoUsageStats(double ioUtilisationPercent) { + this.ioUtilisationPercent = ioUtilisationPercent; + } + + /** + * + * @param in the stream to read from + * @throws IOException if an error occurs while reading from the StreamOutput + */ + public IoUsageStats(StreamInput in) throws IOException { + this.ioUtilisationPercent = in.readDouble(); + } + + /** + * Write this into the {@linkplain StreamOutput}. + * + * @param out the output stream to write entity content to + */ + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeDouble(this.ioUtilisationPercent); + } + + public double getIoUtilisationPercent() { + return ioUtilisationPercent; + } + + public void setIoUtilisationPercent(double ioUtilisationPercent) { + this.ioUtilisationPercent = ioUtilisationPercent; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field("max_io_utilization_percent", String.format(Locale.ROOT, "%.1f", this.ioUtilisationPercent)); + return builder.endObject(); + } + + @Override + public String toString() { + return "IO utilization percent: " + String.format(Locale.ROOT, "%.1f", this.ioUtilisationPercent); + } +} diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index e75db3f606da4..8ea5dadf7cb0d 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -919,6 +919,7 @@ protected Node( final RestController restController = actionModule.getRestController(); final NodeResourceUsageTracker nodeResourceUsageTracker = new NodeResourceUsageTracker( + monitorService.fsService(), threadPool, settings, clusterService.getClusterSettings() diff --git a/server/src/main/java/org/opensearch/node/NodeResourceUsageStats.java b/server/src/main/java/org/opensearch/node/NodeResourceUsageStats.java index 6ef66d4ac1914..26e53218cf026 100644 --- a/server/src/main/java/org/opensearch/node/NodeResourceUsageStats.java +++ b/server/src/main/java/org/opensearch/node/NodeResourceUsageStats.java @@ -8,6 +8,7 @@ package org.opensearch.node; +import org.opensearch.Version; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.common.io.stream.Writeable; @@ -24,12 +25,20 @@ public class NodeResourceUsageStats implements Writeable { long timestamp; double cpuUtilizationPercent; double memoryUtilizationPercent; + private IoUsageStats ioUsageStats; - public NodeResourceUsageStats(String nodeId, long timestamp, double memoryUtilizationPercent, double cpuUtilizationPercent) { + public NodeResourceUsageStats( + String nodeId, + long timestamp, + double memoryUtilizationPercent, + double cpuUtilizationPercent, + IoUsageStats ioUsageStats + ) { this.nodeId = nodeId; this.timestamp = timestamp; this.cpuUtilizationPercent = cpuUtilizationPercent; this.memoryUtilizationPercent = memoryUtilizationPercent; + this.ioUsageStats = ioUsageStats; } public NodeResourceUsageStats(StreamInput in) throws IOException { @@ -37,6 +46,11 @@ public NodeResourceUsageStats(StreamInput in) throws IOException { this.timestamp = in.readLong(); this.cpuUtilizationPercent = in.readDouble(); this.memoryUtilizationPercent = in.readDouble(); + if (in.getVersion().onOrAfter(Version.V_2_13_0)) { + this.ioUsageStats = in.readOptionalWriteable(IoUsageStats::new); + } else { + this.ioUsageStats = null; + } } @Override @@ -45,6 +59,9 @@ public void writeTo(StreamOutput out) throws IOException { out.writeLong(this.timestamp); out.writeDouble(this.cpuUtilizationPercent); out.writeDouble(this.memoryUtilizationPercent); + if (out.getVersion().onOrAfter(Version.V_2_13_0)) { + out.writeOptionalWriteable(this.ioUsageStats); + } } @Override @@ -52,8 +69,11 @@ public String toString() { StringBuilder sb = new StringBuilder("NodeResourceUsageStats["); sb.append(nodeId).append("]("); sb.append("Timestamp: ").append(timestamp); - sb.append(", CPU utilization percent: ").append(String.format(Locale.ROOT, "%.1f", cpuUtilizationPercent)); - sb.append(", Memory utilization percent: ").append(String.format(Locale.ROOT, "%.1f", memoryUtilizationPercent)); + sb.append(", CPU utilization percent: ").append(String.format(Locale.ROOT, "%.1f", this.getCpuUtilizationPercent())); + sb.append(", Memory utilization percent: ").append(String.format(Locale.ROOT, "%.1f", this.getMemoryUtilizationPercent())); + if (this.ioUsageStats != null) { + sb.append(", ").append(this.getIoUsageStats()); + } sb.append(")"); return sb.toString(); } @@ -63,7 +83,8 @@ public String toString() { nodeResourceUsageStats.nodeId, nodeResourceUsageStats.timestamp, nodeResourceUsageStats.memoryUtilizationPercent, - nodeResourceUsageStats.cpuUtilizationPercent + nodeResourceUsageStats.cpuUtilizationPercent, + nodeResourceUsageStats.ioUsageStats ); } @@ -75,6 +96,14 @@ public double getCpuUtilizationPercent() { return cpuUtilizationPercent; } + public IoUsageStats getIoUsageStats() { + return ioUsageStats; + } + + public void setIoUsageStats(IoUsageStats ioUsageStats) { + this.ioUsageStats = ioUsageStats; + } + public long getTimestamp() { return timestamp; } diff --git a/server/src/main/java/org/opensearch/node/NodesResourceUsageStats.java b/server/src/main/java/org/opensearch/node/NodesResourceUsageStats.java index 3dff9a27f71a8..35c82c904ad1c 100644 --- a/server/src/main/java/org/opensearch/node/NodesResourceUsageStats.java +++ b/server/src/main/java/org/opensearch/node/NodesResourceUsageStats.java @@ -60,6 +60,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws "memory_utilization_percent", String.format(Locale.ROOT, "%.1f", resourceUsageStats.memoryUtilizationPercent) ); + if (resourceUsageStats.getIoUsageStats() != null) { + builder.field("io_usage_stats", resourceUsageStats.getIoUsageStats()); + } } builder.endObject(); } diff --git a/server/src/main/java/org/opensearch/node/ResourceUsageCollectorService.java b/server/src/main/java/org/opensearch/node/ResourceUsageCollectorService.java index f1c763e09f147..ecd2a5615e1fe 100644 --- a/server/src/main/java/org/opensearch/node/ResourceUsageCollectorService.java +++ b/server/src/main/java/org/opensearch/node/ResourceUsageCollectorService.java @@ -78,14 +78,16 @@ public void collectNodeResourceUsageStats( String nodeId, long timestamp, double memoryUtilizationPercent, - double cpuUtilizationPercent + double cpuUtilizationPercent, + IoUsageStats ioUsageStats ) { nodeIdToResourceUsageStats.compute(nodeId, (id, resourceUsageStats) -> { if (resourceUsageStats == null) { - return new NodeResourceUsageStats(nodeId, timestamp, memoryUtilizationPercent, cpuUtilizationPercent); + return new NodeResourceUsageStats(nodeId, timestamp, memoryUtilizationPercent, cpuUtilizationPercent, ioUsageStats); } else { resourceUsageStats.cpuUtilizationPercent = cpuUtilizationPercent; resourceUsageStats.memoryUtilizationPercent = memoryUtilizationPercent; + resourceUsageStats.setIoUsageStats(ioUsageStats); resourceUsageStats.timestamp = timestamp; return resourceUsageStats; } @@ -129,7 +131,8 @@ private void collectLocalNodeResourceUsageStats() { clusterService.state().nodes().getLocalNodeId(), System.currentTimeMillis(), nodeResourceUsageTracker.getMemoryUtilizationPercent(), - nodeResourceUsageTracker.getCpuUtilizationPercent() + nodeResourceUsageTracker.getCpuUtilizationPercent(), + nodeResourceUsageTracker.getIoUsageStats() ); } } diff --git a/server/src/main/java/org/opensearch/node/resource/tracker/AbstractAverageUsageTracker.java b/server/src/main/java/org/opensearch/node/resource/tracker/AbstractAverageUsageTracker.java index f83a1b7f9fc05..69c7afc1d4b43 100644 --- a/server/src/main/java/org/opensearch/node/resource/tracker/AbstractAverageUsageTracker.java +++ b/server/src/main/java/org/opensearch/node/resource/tracker/AbstractAverageUsageTracker.java @@ -24,12 +24,12 @@ public abstract class AbstractAverageUsageTracker extends AbstractLifecycleComponent { private static final Logger LOGGER = LogManager.getLogger(AbstractAverageUsageTracker.class); - private final ThreadPool threadPool; - private final TimeValue pollingInterval; + protected final ThreadPool threadPool; + protected final TimeValue pollingInterval; private TimeValue windowDuration; private final AtomicReference observations = new AtomicReference<>(); - private volatile Scheduler.Cancellable scheduledFuture; + protected volatile Scheduler.Cancellable scheduledFuture; public AbstractAverageUsageTracker(ThreadPool threadPool, TimeValue pollingInterval, TimeValue windowDuration) { this.threadPool = threadPool; diff --git a/server/src/main/java/org/opensearch/node/resource/tracker/AverageIoUsageTracker.java b/server/src/main/java/org/opensearch/node/resource/tracker/AverageIoUsageTracker.java new file mode 100644 index 0000000000000..5472d4bda2326 --- /dev/null +++ b/server/src/main/java/org/opensearch/node/resource/tracker/AverageIoUsageTracker.java @@ -0,0 +1,102 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.node.resource.tracker; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.util.Constants; +import org.opensearch.common.ValidationException; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.monitor.fs.FsInfo.DeviceStats; +import org.opensearch.monitor.fs.FsService; +import org.opensearch.node.IoUsageStats; +import org.opensearch.threadpool.ThreadPool; + +import java.util.HashMap; +import java.util.Optional; + +/** + * AverageIoUsageTracker tracks the IO usage by polling the FS Stats for IO metrics every (pollingInterval) + * and keeping track of the rolling average over a defined time window (windowDuration). + */ +public class AverageIoUsageTracker extends AbstractAverageUsageTracker { + + private static final Logger LOGGER = LogManager.getLogger(AverageIoUsageTracker.class); + private final FsService fsService; + private final HashMap prevIoTimeDeviceMap; + private long prevTimeInMillis; + private IoUsageStats ioUsageStats; + + public AverageIoUsageTracker(FsService fsService, ThreadPool threadPool, TimeValue pollingInterval, TimeValue windowDuration) { + super(threadPool, pollingInterval, windowDuration); + this.fsService = fsService; + this.prevIoTimeDeviceMap = new HashMap<>(); + this.prevTimeInMillis = -1; + this.ioUsageStats = null; + } + + /** + * Get current IO usage percentage calculated using fs stats + */ + @Override + public long getUsage() { + long usage = 0; + Optional validationException = this.preValidateFsStats(); + if (validationException != null && validationException.isPresent()) { + throw validationException.get(); + } + // Currently even during the raid setup we have only one mount device and it is giving 0 io time from /proc/diskstats + DeviceStats[] devicesStats = fsService.stats().getIoStats().getDevicesStats(); + long latestTimeInMillis = fsService.stats().getTimestamp(); + for (DeviceStats devicesStat : devicesStats) { + long devicePreviousIoTime = prevIoTimeDeviceMap.getOrDefault(devicesStat.getDeviceName(), (long) -1); + long deviceCurrentIoTime = devicesStat.ioTimeInMillis(); + if (prevTimeInMillis > 0 && (latestTimeInMillis - this.prevTimeInMillis > 0) && devicePreviousIoTime > 0) { + long absIoTime = (deviceCurrentIoTime - devicePreviousIoTime); + long deviceCurrentIoUsage = absIoTime * 100 / (latestTimeInMillis - this.prevTimeInMillis); + // We are returning the maximum IO Usage for all the attached devices + usage = Math.max(usage, deviceCurrentIoUsage); + } + prevIoTimeDeviceMap.put(devicesStat.getDeviceName(), devicesStat.ioTimeInMillis()); + } + this.prevTimeInMillis = latestTimeInMillis; + return usage; + } + + @Override + protected void doStart() { + if (Constants.LINUX) { + this.ioUsageStats = new IoUsageStats(-1); + scheduledFuture = threadPool.scheduleWithFixedDelay(() -> { + long usage = getUsage(); + recordUsage(usage); + updateIoUsageStats(); + }, pollingInterval, ThreadPool.Names.GENERIC); + } + } + + public Optional preValidateFsStats() { + ValidationException validationException = new ValidationException(); + if (fsService == null + || fsService.stats() == null + || fsService.stats().getIoStats() == null + || fsService.stats().getIoStats().getDevicesStats() == null) { + validationException.addValidationError("FSService IoStats Or DeviceStats are Missing"); + } + return validationException.validationErrors().isEmpty() ? Optional.empty() : Optional.of(validationException); + } + + private void updateIoUsageStats() { + this.ioUsageStats.setIoUtilisationPercent(this.isReady() ? this.getAverage() : -1); + } + + public IoUsageStats getIoUsageStats() { + return this.ioUsageStats; + } +} diff --git a/server/src/main/java/org/opensearch/node/resource/tracker/NodeResourceUsageTracker.java b/server/src/main/java/org/opensearch/node/resource/tracker/NodeResourceUsageTracker.java index cf5f38c1b004c..546ae07cde221 100644 --- a/server/src/main/java/org/opensearch/node/resource/tracker/NodeResourceUsageTracker.java +++ b/server/src/main/java/org/opensearch/node/resource/tracker/NodeResourceUsageTracker.java @@ -12,6 +12,8 @@ import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; +import org.opensearch.monitor.fs.FsService; +import org.opensearch.node.IoUsageStats; import org.opensearch.threadpool.ThreadPool; /** @@ -22,10 +24,14 @@ public class NodeResourceUsageTracker extends AbstractLifecycleComponent { private final ClusterSettings clusterSettings; private AverageCpuUsageTracker cpuUsageTracker; private AverageMemoryUsageTracker memoryUsageTracker; + private AverageIoUsageTracker ioUsageTracker; private ResourceTrackerSettings resourceTrackerSettings; - public NodeResourceUsageTracker(ThreadPool threadPool, Settings settings, ClusterSettings clusterSettings) { + private final FsService fsService; + + public NodeResourceUsageTracker(FsService fsService, ThreadPool threadPool, Settings settings, ClusterSettings clusterSettings) { + this.fsService = fsService; this.threadPool = threadPool; this.clusterSettings = clusterSettings; this.resourceTrackerSettings = new ResourceTrackerSettings(settings); @@ -52,6 +58,13 @@ public double getMemoryUtilizationPercent() { return 0.0; } + /** + * Return io stats average if we have enough datapoints, otherwise return 0 + */ + public IoUsageStats getIoUsageStats() { + return ioUsageTracker.getIoUsageStats(); + } + /** * Checks if all of the resource usage trackers are ready */ @@ -79,6 +92,17 @@ void initialize() { ResourceTrackerSettings.GLOBAL_JVM_USAGE_AC_WINDOW_DURATION_SETTING, this::setMemoryWindowDuration ); + + ioUsageTracker = new AverageIoUsageTracker( + fsService, + threadPool, + resourceTrackerSettings.getIoPollingInterval(), + resourceTrackerSettings.getIoWindowDuration() + ); + clusterSettings.addSettingsUpdateConsumer( + ResourceTrackerSettings.GLOBAL_IO_USAGE_AC_WINDOW_DURATION_SETTING, + this::setIoWindowDuration + ); } private void setMemoryWindowDuration(TimeValue windowDuration) { @@ -91,6 +115,11 @@ private void setCpuWindowDuration(TimeValue windowDuration) { resourceTrackerSettings.setCpuWindowDuration(windowDuration); } + private void setIoWindowDuration(TimeValue windowDuration) { + ioUsageTracker.setWindowSize(windowDuration); + resourceTrackerSettings.setIoWindowDuration(windowDuration); + } + /** * Visible for testing */ @@ -102,17 +131,20 @@ ResourceTrackerSettings getResourceTrackerSettings() { protected void doStart() { cpuUsageTracker.doStart(); memoryUsageTracker.doStart(); + ioUsageTracker.doStart(); } @Override protected void doStop() { cpuUsageTracker.doStop(); memoryUsageTracker.doStop(); + ioUsageTracker.doStop(); } @Override protected void doClose() { cpuUsageTracker.doClose(); memoryUsageTracker.doClose(); + ioUsageTracker.doClose(); } } diff --git a/server/src/main/java/org/opensearch/node/resource/tracker/ResourceTrackerSettings.java b/server/src/main/java/org/opensearch/node/resource/tracker/ResourceTrackerSettings.java index f81b008ba7e8b..b423b92c8a4fb 100644 --- a/server/src/main/java/org/opensearch/node/resource/tracker/ResourceTrackerSettings.java +++ b/server/src/main/java/org/opensearch/node/resource/tracker/ResourceTrackerSettings.java @@ -26,6 +26,14 @@ private static class Defaults { * This is the default window duration on which the average resource utilization values will be calculated */ private static final long WINDOW_DURATION_IN_SECONDS = 30; + /** + * This is the default polling interval for IO usage tracker + */ + private static final long IO_POLLING_INTERVAL_IN_MILLIS = 5000; + /** + * This is the default window duration for IO usage tracker on which the average resource utilization values will be calculated + */ + private static final long IO_WINDOW_DURATION_IN_SECONDS = 120; } public static final Setting GLOBAL_CPU_USAGE_AC_POLLING_INTERVAL_SETTING = Setting.positiveTimeSetting( @@ -40,6 +48,18 @@ private static class Defaults { Setting.Property.NodeScope ); + public static final Setting GLOBAL_IO_USAGE_AC_POLLING_INTERVAL_SETTING = Setting.positiveTimeSetting( + "node.resource.tracker.global_io_usage.polling_interval", + TimeValue.timeValueMillis(Defaults.IO_POLLING_INTERVAL_IN_MILLIS), + Setting.Property.NodeScope + ); + public static final Setting GLOBAL_IO_USAGE_AC_WINDOW_DURATION_SETTING = Setting.positiveTimeSetting( + "node.resource.tracker.global_io_usage.window_duration", + TimeValue.timeValueSeconds(Defaults.IO_WINDOW_DURATION_IN_SECONDS), + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + public static final Setting GLOBAL_JVM_USAGE_AC_POLLING_INTERVAL_SETTING = Setting.positiveTimeSetting( "node.resource.tracker.global_jvmmp.polling_interval", TimeValue.timeValueMillis(Defaults.POLLING_INTERVAL_IN_MILLIS), @@ -56,12 +76,16 @@ private static class Defaults { private volatile TimeValue cpuPollingInterval; private volatile TimeValue memoryWindowDuration; private volatile TimeValue memoryPollingInterval; + private volatile TimeValue ioWindowDuration; + private volatile TimeValue ioPollingInterval; public ResourceTrackerSettings(Settings settings) { this.cpuPollingInterval = GLOBAL_CPU_USAGE_AC_POLLING_INTERVAL_SETTING.get(settings); this.cpuWindowDuration = GLOBAL_CPU_USAGE_AC_WINDOW_DURATION_SETTING.get(settings); this.memoryPollingInterval = GLOBAL_JVM_USAGE_AC_POLLING_INTERVAL_SETTING.get(settings); this.memoryWindowDuration = GLOBAL_JVM_USAGE_AC_WINDOW_DURATION_SETTING.get(settings); + this.ioPollingInterval = GLOBAL_IO_USAGE_AC_POLLING_INTERVAL_SETTING.get(settings); + this.ioWindowDuration = GLOBAL_IO_USAGE_AC_WINDOW_DURATION_SETTING.get(settings); } public TimeValue getCpuWindowDuration() { @@ -80,6 +104,14 @@ public TimeValue getMemoryWindowDuration() { return memoryWindowDuration; } + public TimeValue getIoPollingInterval() { + return ioPollingInterval; + } + + public TimeValue getIoWindowDuration() { + return ioWindowDuration; + } + public void setCpuWindowDuration(TimeValue cpuWindowDuration) { this.cpuWindowDuration = cpuWindowDuration; } @@ -87,4 +119,8 @@ public void setCpuWindowDuration(TimeValue cpuWindowDuration) { public void setMemoryWindowDuration(TimeValue memoryWindowDuration) { this.memoryWindowDuration = memoryWindowDuration; } + + public void setIoWindowDuration(TimeValue ioWindowDuration) { + this.ioWindowDuration = ioWindowDuration; + } } diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java index 05bb2706e1b03..af76292e83d61 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java @@ -65,6 +65,7 @@ import org.opensearch.monitor.os.OsStats; import org.opensearch.monitor.process.ProcessStats; import org.opensearch.node.AdaptiveSelectionStats; +import org.opensearch.node.IoUsageStats; import org.opensearch.node.NodeResourceUsageStats; import org.opensearch.node.NodesResourceUsageStats; import org.opensearch.node.ResponseCollectorService; @@ -881,7 +882,8 @@ public static NodeStats createNodeStats(boolean remoteStoreStats) throws IOExcep nodeId, System.currentTimeMillis(), randomDoubleBetween(1.0, 100.0, true), - randomDoubleBetween(1.0, 100.0, true) + randomDoubleBetween(1.0, 100.0, true), + new IoUsageStats(100.0) ); resourceUsageStatsMap.put(nodeId, stats); } diff --git a/server/src/test/java/org/opensearch/node/IoUsageStatsTests.java b/server/src/test/java/org/opensearch/node/IoUsageStatsTests.java new file mode 100644 index 0000000000000..4a4de44e3acea --- /dev/null +++ b/server/src/test/java/org/opensearch/node/IoUsageStatsTests.java @@ -0,0 +1,57 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.node; + +import org.opensearch.common.xcontent.json.JsonXContent; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.util.Locale; + +public class IoUsageStatsTests extends OpenSearchTestCase { + IoUsageStats ioUsageStats; + + @Override + public void setUp() throws Exception { + super.setUp(); + ioUsageStats = new IoUsageStats(10); + } + + public void testDefaultsIoUsageStats() { + assertEquals(ioUsageStats.getIoUtilisationPercent(), 10.0, 0); + } + + public void testUpdateIoUsageStats() { + assertEquals(ioUsageStats.getIoUtilisationPercent(), 10.0, 0); + ioUsageStats.setIoUtilisationPercent(20); + assertEquals(ioUsageStats.getIoUtilisationPercent(), 20.0, 0); + } + + public void testIoUsageStats() throws IOException { + XContentBuilder builder = JsonXContent.contentBuilder(); + builder = ioUsageStats.toXContent(builder, ToXContent.EMPTY_PARAMS); + String response = builder.toString(); + assertEquals(response, "{\"max_io_utilization_percent\":\"10.0\"}"); + ioUsageStats.setIoUtilisationPercent(20); + builder = JsonXContent.contentBuilder(); + builder = ioUsageStats.toXContent(builder, ToXContent.EMPTY_PARAMS); + response = builder.toString(); + assertEquals(response, "{\"max_io_utilization_percent\":\"20.0\"}"); + } + + public void testIoUsageStatsToString() { + String expected = "IO utilization percent: " + String.format(Locale.ROOT, "%.1f", 10.0); + assertEquals(expected, ioUsageStats.toString()); + ioUsageStats.setIoUtilisationPercent(20); + expected = "IO utilization percent: " + String.format(Locale.ROOT, "%.1f", 20.0); + assertEquals(expected, ioUsageStats.toString()); + } +} diff --git a/server/src/test/java/org/opensearch/node/ResourceUsageCollectorServiceTests.java b/server/src/test/java/org/opensearch/node/ResourceUsageCollectorServiceTests.java index b2fa884afab69..f2ee0e61c4953 100644 --- a/server/src/test/java/org/opensearch/node/ResourceUsageCollectorServiceTests.java +++ b/server/src/test/java/org/opensearch/node/ResourceUsageCollectorServiceTests.java @@ -57,6 +57,7 @@ public void setUp() throws Exception { .put(ResourceTrackerSettings.GLOBAL_JVM_USAGE_AC_WINDOW_DURATION_SETTING.getKey(), new TimeValue(500, TimeUnit.MILLISECONDS)) .build(); tracker = new NodeResourceUsageTracker( + null, threadpool, settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) @@ -78,17 +79,19 @@ public void tearDown() throws Exception { } public void testResourceUsageStats() { - collector.collectNodeResourceUsageStats("node1", System.currentTimeMillis(), 97, 99); + collector.collectNodeResourceUsageStats("node1", System.currentTimeMillis(), 97, 99, new IoUsageStats(98)); Map nodeStats = collector.getAllNodeStatistics(); assertTrue(nodeStats.containsKey("node1")); assertEquals(99.0, nodeStats.get("node1").cpuUtilizationPercent, 0.0); assertEquals(97.0, nodeStats.get("node1").memoryUtilizationPercent, 0.0); + assertEquals(98, nodeStats.get("node1").getIoUsageStats().getIoUtilisationPercent(), 0.0); Optional nodeResourceUsageStatsOptional = collector.getNodeStatistics("node1"); assertNotNull(nodeResourceUsageStatsOptional.get()); assertEquals(99.0, nodeResourceUsageStatsOptional.get().cpuUtilizationPercent, 0.0); assertEquals(97.0, nodeResourceUsageStatsOptional.get().memoryUtilizationPercent, 0.0); + assertEquals(98, nodeResourceUsageStatsOptional.get().getIoUsageStats().getIoUtilisationPercent(), 0.0); nodeResourceUsageStatsOptional = collector.getNodeStatistics("node2"); assertTrue(nodeResourceUsageStatsOptional.isEmpty()); @@ -137,7 +140,8 @@ public void testConcurrentAddingAndRemovingNodes() throws Exception { randomFrom(nodes), System.currentTimeMillis(), randomIntBetween(1, 100), - randomIntBetween(1, 100) + randomIntBetween(1, 100), + new IoUsageStats(randomIntBetween(1, 100)) ); } }; @@ -162,13 +166,26 @@ public void testConcurrentAddingAndRemovingNodes() throws Exception { if (nodeStats.containsKey(nodeId)) { assertThat(nodeStats.get(nodeId).memoryUtilizationPercent, greaterThan(0.0)); assertThat(nodeStats.get(nodeId).cpuUtilizationPercent, greaterThan(0.0)); + assertThat(nodeStats.get(nodeId).getIoUsageStats().getIoUtilisationPercent(), greaterThan(0.0)); } } } public void testNodeRemoval() { - collector.collectNodeResourceUsageStats("node1", System.currentTimeMillis(), randomIntBetween(1, 100), randomIntBetween(1, 100)); - collector.collectNodeResourceUsageStats("node2", System.currentTimeMillis(), randomIntBetween(1, 100), randomIntBetween(1, 100)); + collector.collectNodeResourceUsageStats( + "node1", + System.currentTimeMillis(), + randomIntBetween(1, 100), + randomIntBetween(1, 100), + new IoUsageStats(randomIntBetween(1, 100)) + ); + collector.collectNodeResourceUsageStats( + "node2", + System.currentTimeMillis(), + randomIntBetween(1, 100), + randomIntBetween(1, 100), + new IoUsageStats(randomIntBetween(1, 100)) + ); ClusterState previousState = ClusterState.builder(new ClusterName("cluster")) .nodes( diff --git a/server/src/test/java/org/opensearch/node/resource/tracker/AverageUsageTrackerTests.java b/server/src/test/java/org/opensearch/node/resource/tracker/AverageUsageTrackerTests.java index 374c993a264d4..49a58991e8e5c 100644 --- a/server/src/test/java/org/opensearch/node/resource/tracker/AverageUsageTrackerTests.java +++ b/server/src/test/java/org/opensearch/node/resource/tracker/AverageUsageTrackerTests.java @@ -8,15 +8,22 @@ package org.opensearch.node.resource.tracker; +import org.opensearch.common.ValidationException; import org.opensearch.common.unit.TimeValue; +import org.opensearch.monitor.fs.FsInfo; +import org.opensearch.monitor.fs.FsService; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; import org.junit.After; import org.junit.Before; +import java.util.Optional; import java.util.concurrent.TimeUnit; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + /** * Tests to validate AverageMemoryUsageTracker and AverageCpuUsageTracker implementation */ @@ -24,10 +31,12 @@ public class AverageUsageTrackerTests extends OpenSearchTestCase { ThreadPool threadPool; AverageMemoryUsageTracker averageMemoryUsageTracker; AverageCpuUsageTracker averageCpuUsageTracker; + AverageIoUsageTracker averageIoUsageTracker; @Before public void setup() { threadPool = new TestThreadPool(getClass().getName()); + FsService fsService = mock(FsService.class); averageMemoryUsageTracker = new AverageMemoryUsageTracker( threadPool, new TimeValue(500, TimeUnit.MILLISECONDS), @@ -38,6 +47,12 @@ public void setup() { new TimeValue(500, TimeUnit.MILLISECONDS), new TimeValue(1000, TimeUnit.MILLISECONDS) ); + averageIoUsageTracker = new AverageIoUsageTracker( + fsService, + threadPool, + new TimeValue(500, TimeUnit.MILLISECONDS), + new TimeValue(1000, TimeUnit.MILLISECONDS) + ); } @After @@ -46,14 +61,15 @@ public void cleanup() { } public void testBasicUsage() { - assertAverageUsageStats(averageMemoryUsageTracker); assertAverageUsageStats(averageCpuUsageTracker); + assertAverageUsageStats(averageIoUsageTracker); } public void testUpdateWindowSize() { assertUpdateWindowSize(averageMemoryUsageTracker); assertUpdateWindowSize(averageCpuUsageTracker); + assertUpdateWindowSize(averageIoUsageTracker); } private void assertAverageUsageStats(AbstractAverageUsageTracker usageTracker) { @@ -96,4 +112,24 @@ private void assertUpdateWindowSize(AbstractAverageUsageTracker usageTracker) { // ( 2 + 1 + 2 + 2 ) / 4 = 1.75 assertEquals(1.75, usageTracker.getAverage(), 0.0); } + + public void testPreValidationForIOTracker() { + Optional validationException = averageIoUsageTracker.preValidateFsStats(); + assertTrue(validationException.isPresent()); + FsService fsService = mock(FsService.class); + FsInfo fsInfo = mock(FsInfo.class); + FsInfo.IoStats ioStats = mock(FsInfo.IoStats.class); + when(fsService.stats()).thenReturn(fsInfo); + when(fsInfo.getIoStats()).thenReturn(ioStats); + FsInfo.DeviceStats[] deviceStats = new FsInfo.DeviceStats[0]; + when(fsService.stats().getIoStats().getDevicesStats()).thenReturn(deviceStats); + averageIoUsageTracker = new AverageIoUsageTracker( + fsService, + threadPool, + new TimeValue(500, TimeUnit.MILLISECONDS), + new TimeValue(1000, TimeUnit.MILLISECONDS) + ); + validationException = averageIoUsageTracker.preValidateFsStats(); + assertFalse(validationException.isPresent()); + } } diff --git a/server/src/test/java/org/opensearch/node/resource/tracker/NodeResourceUsageTrackerTests.java b/server/src/test/java/org/opensearch/node/resource/tracker/NodeResourceUsageTrackerTests.java index 1ce68b9f29062..191b09331f111 100644 --- a/server/src/test/java/org/opensearch/node/resource/tracker/NodeResourceUsageTrackerTests.java +++ b/server/src/test/java/org/opensearch/node/resource/tracker/NodeResourceUsageTrackerTests.java @@ -12,6 +12,7 @@ import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; +import org.opensearch.monitor.fs.FsService; import org.opensearch.test.OpenSearchSingleNodeTestCase; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; @@ -22,6 +23,7 @@ import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; import static org.hamcrest.Matchers.greaterThan; +import static org.mockito.Mockito.mock; /** * Tests to assert resource usage trackers retrieving resource utilization averages @@ -51,6 +53,7 @@ public void testStats() throws Exception { .put(ResourceTrackerSettings.GLOBAL_JVM_USAGE_AC_WINDOW_DURATION_SETTING.getKey(), new TimeValue(500, TimeUnit.MILLISECONDS)) .build(); NodeResourceUsageTracker tracker = new NodeResourceUsageTracker( + mock(FsService.class), threadPool, settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) @@ -67,6 +70,7 @@ public void testStats() throws Exception { public void testUpdateSettings() { NodeResourceUsageTracker tracker = new NodeResourceUsageTracker( + mock(FsService.class), threadPool, Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) @@ -74,6 +78,7 @@ public void testUpdateSettings() { assertEquals(tracker.getResourceTrackerSettings().getCpuWindowDuration().getSeconds(), 30); assertEquals(tracker.getResourceTrackerSettings().getMemoryWindowDuration().getSeconds(), 30); + assertEquals(tracker.getResourceTrackerSettings().getIoWindowDuration().getSeconds(), 120); Settings settings = Settings.builder() .put(ResourceTrackerSettings.GLOBAL_CPU_USAGE_AC_WINDOW_DURATION_SETTING.getKey(), "10s") @@ -92,5 +97,13 @@ public void testUpdateSettings() { "5s", response.getPersistentSettings().get(ResourceTrackerSettings.GLOBAL_JVM_USAGE_AC_WINDOW_DURATION_SETTING.getKey()) ); + Settings ioSettings = Settings.builder() + .put(ResourceTrackerSettings.GLOBAL_IO_USAGE_AC_WINDOW_DURATION_SETTING.getKey(), "20s") + .build(); + response = client().admin().cluster().prepareUpdateSettings().setPersistentSettings(ioSettings).get(); + assertEquals( + "20s", + response.getPersistentSettings().get(ResourceTrackerSettings.GLOBAL_IO_USAGE_AC_WINDOW_DURATION_SETTING.getKey()) + ); } }