Skip to content

Commit

Permalink
[Backport 2.x] Integrate IO Usage Tracker to the Resource Usage Colle…
Browse files Browse the repository at this point in the history
…ctor Service and Emit IO Usage Stats #11880 (#12558)

* Integrate IO Usage Tracker to the Resource Usage Collector Service and Emit IO Usage Stats (#11880)

* Updating OS Version to 2_13

Signed-off-by: Ajay Kumar Movva <movvaam@amazon.com>
  • Loading branch information
ajaymovva committed Mar 8, 2024
1 parent 194f357 commit 8a06c53
Show file tree
Hide file tree
Showing 17 changed files with 424 additions and 18 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Support for returning scores in matched queries ([#11626](https://github.com/opensearch-project/OpenSearch/pull/11626))
- Add shard id property to SearchLookup for use in field types provided by plugins ([#1063](https://github.com/opensearch-project/OpenSearch/pull/1063))
- Add kuromoji_completion analyzer and filter ([#4835](https://github.com/opensearch-project/OpenSearch/issues/4835))
- [Admission Control] Integrate IO Usage Tracker to the Resource Usage Collector Service and Emit IO Usage Stats ([#11880](https://github.com/opensearch-project/OpenSearch/pull/11880))

### Dependencies
- Bump `com.squareup.okio:okio` from 3.7.0 to 3.8.0 ([#12290](https://github.com/opensearch-project/OpenSearch/pull/12290))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -669,6 +669,7 @@ public void apply(Settings value, Settings current, Settings previous) {
// Settings related to resource trackers
ResourceTrackerSettings.GLOBAL_CPU_USAGE_AC_WINDOW_DURATION_SETTING,
ResourceTrackerSettings.GLOBAL_JVM_USAGE_AC_WINDOW_DURATION_SETTING,
ResourceTrackerSettings.GLOBAL_IO_USAGE_AC_WINDOW_DURATION_SETTING,

// Settings related to Searchable Snapshots
Node.NODE_SEARCH_CACHE_SIZE_SETTING,
Expand Down
6 changes: 5 additions & 1 deletion server/src/main/java/org/opensearch/monitor/fs/FsInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -450,9 +450,13 @@ public long ioTimeInMillis() {
return (currentIOTime - previousIOTime);
}

public String getDeviceName() {
return deviceName;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field("device_name", deviceName);
builder.field("device_name", getDeviceName());
builder.field(IoStats.OPERATIONS, operations());
builder.field(IoStats.READ_OPERATIONS, readOperations());
builder.field(IoStats.WRITE_OPERATIONS, writeOperations());
Expand Down
69 changes: 69 additions & 0 deletions server/src/main/java/org/opensearch/node/IoUsageStats.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.node;

import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.xcontent.ToXContentFragment;
import org.opensearch.core.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.Locale;

/**
* This class is to store tne IO Usage Stats and used to return in node stats API.
*/
public class IoUsageStats implements Writeable, ToXContentFragment {

private double ioUtilisationPercent;

public IoUsageStats(double ioUtilisationPercent) {
this.ioUtilisationPercent = ioUtilisationPercent;
}

/**
*
* @param in the stream to read from
* @throws IOException if an error occurs while reading from the StreamOutput
*/
public IoUsageStats(StreamInput in) throws IOException {
this.ioUtilisationPercent = in.readDouble();
}

/**
* Write this into the {@linkplain StreamOutput}.
*
* @param out the output stream to write entity content to
*/
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeDouble(this.ioUtilisationPercent);
}

public double getIoUtilisationPercent() {
return ioUtilisationPercent;
}

public void setIoUtilisationPercent(double ioUtilisationPercent) {
this.ioUtilisationPercent = ioUtilisationPercent;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("max_io_utilization_percent", String.format(Locale.ROOT, "%.1f", this.ioUtilisationPercent));
return builder.endObject();
}

@Override
public String toString() {
return "IO utilization percent: " + String.format(Locale.ROOT, "%.1f", this.ioUtilisationPercent);
}
}
1 change: 1 addition & 0 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -919,6 +919,7 @@ protected Node(
final RestController restController = actionModule.getRestController();

final NodeResourceUsageTracker nodeResourceUsageTracker = new NodeResourceUsageTracker(
monitorService.fsService(),
threadPool,
settings,
clusterService.getClusterSettings()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.node;

import org.opensearch.Version;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
Expand All @@ -24,19 +25,32 @@ public class NodeResourceUsageStats implements Writeable {
long timestamp;
double cpuUtilizationPercent;
double memoryUtilizationPercent;
private IoUsageStats ioUsageStats;

public NodeResourceUsageStats(String nodeId, long timestamp, double memoryUtilizationPercent, double cpuUtilizationPercent) {
public NodeResourceUsageStats(
String nodeId,
long timestamp,
double memoryUtilizationPercent,
double cpuUtilizationPercent,
IoUsageStats ioUsageStats
) {
this.nodeId = nodeId;
this.timestamp = timestamp;
this.cpuUtilizationPercent = cpuUtilizationPercent;
this.memoryUtilizationPercent = memoryUtilizationPercent;
this.ioUsageStats = ioUsageStats;
}

public NodeResourceUsageStats(StreamInput in) throws IOException {
this.nodeId = in.readString();
this.timestamp = in.readLong();
this.cpuUtilizationPercent = in.readDouble();
this.memoryUtilizationPercent = in.readDouble();
if (in.getVersion().onOrAfter(Version.V_2_13_0)) {
this.ioUsageStats = in.readOptionalWriteable(IoUsageStats::new);
} else {
this.ioUsageStats = null;
}
}

@Override
Expand All @@ -45,15 +59,21 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeLong(this.timestamp);
out.writeDouble(this.cpuUtilizationPercent);
out.writeDouble(this.memoryUtilizationPercent);
if (out.getVersion().onOrAfter(Version.V_2_13_0)) {
out.writeOptionalWriteable(this.ioUsageStats);
}
}

@Override
public String toString() {
StringBuilder sb = new StringBuilder("NodeResourceUsageStats[");
sb.append(nodeId).append("](");
sb.append("Timestamp: ").append(timestamp);
sb.append(", CPU utilization percent: ").append(String.format(Locale.ROOT, "%.1f", cpuUtilizationPercent));
sb.append(", Memory utilization percent: ").append(String.format(Locale.ROOT, "%.1f", memoryUtilizationPercent));
sb.append(", CPU utilization percent: ").append(String.format(Locale.ROOT, "%.1f", this.getCpuUtilizationPercent()));
sb.append(", Memory utilization percent: ").append(String.format(Locale.ROOT, "%.1f", this.getMemoryUtilizationPercent()));
if (this.ioUsageStats != null) {
sb.append(", ").append(this.getIoUsageStats());
}
sb.append(")");
return sb.toString();
}
Expand All @@ -63,7 +83,8 @@ public String toString() {
nodeResourceUsageStats.nodeId,
nodeResourceUsageStats.timestamp,
nodeResourceUsageStats.memoryUtilizationPercent,
nodeResourceUsageStats.cpuUtilizationPercent
nodeResourceUsageStats.cpuUtilizationPercent,
nodeResourceUsageStats.ioUsageStats
);
}

Expand All @@ -75,6 +96,14 @@ public double getCpuUtilizationPercent() {
return cpuUtilizationPercent;
}

public IoUsageStats getIoUsageStats() {
return ioUsageStats;
}

public void setIoUsageStats(IoUsageStats ioUsageStats) {
this.ioUsageStats = ioUsageStats;
}

public long getTimestamp() {
return timestamp;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
"memory_utilization_percent",
String.format(Locale.ROOT, "%.1f", resourceUsageStats.memoryUtilizationPercent)
);
if (resourceUsageStats.getIoUsageStats() != null) {
builder.field("io_usage_stats", resourceUsageStats.getIoUsageStats());
}
}
builder.endObject();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,16 @@ public void collectNodeResourceUsageStats(
String nodeId,
long timestamp,
double memoryUtilizationPercent,
double cpuUtilizationPercent
double cpuUtilizationPercent,
IoUsageStats ioUsageStats
) {
nodeIdToResourceUsageStats.compute(nodeId, (id, resourceUsageStats) -> {
if (resourceUsageStats == null) {
return new NodeResourceUsageStats(nodeId, timestamp, memoryUtilizationPercent, cpuUtilizationPercent);
return new NodeResourceUsageStats(nodeId, timestamp, memoryUtilizationPercent, cpuUtilizationPercent, ioUsageStats);
} else {
resourceUsageStats.cpuUtilizationPercent = cpuUtilizationPercent;
resourceUsageStats.memoryUtilizationPercent = memoryUtilizationPercent;
resourceUsageStats.setIoUsageStats(ioUsageStats);
resourceUsageStats.timestamp = timestamp;
return resourceUsageStats;
}
Expand Down Expand Up @@ -129,7 +131,8 @@ private void collectLocalNodeResourceUsageStats() {
clusterService.state().nodes().getLocalNodeId(),
System.currentTimeMillis(),
nodeResourceUsageTracker.getMemoryUtilizationPercent(),
nodeResourceUsageTracker.getCpuUtilizationPercent()
nodeResourceUsageTracker.getCpuUtilizationPercent(),
nodeResourceUsageTracker.getIoUsageStats()
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@
public abstract class AbstractAverageUsageTracker extends AbstractLifecycleComponent {
private static final Logger LOGGER = LogManager.getLogger(AbstractAverageUsageTracker.class);

private final ThreadPool threadPool;
private final TimeValue pollingInterval;
protected final ThreadPool threadPool;
protected final TimeValue pollingInterval;
private TimeValue windowDuration;
private final AtomicReference<MovingAverage> observations = new AtomicReference<>();

private volatile Scheduler.Cancellable scheduledFuture;
protected volatile Scheduler.Cancellable scheduledFuture;

public AbstractAverageUsageTracker(ThreadPool threadPool, TimeValue pollingInterval, TimeValue windowDuration) {
this.threadPool = threadPool;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.node.resource.tracker;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.Constants;
import org.opensearch.common.ValidationException;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.monitor.fs.FsInfo.DeviceStats;
import org.opensearch.monitor.fs.FsService;
import org.opensearch.node.IoUsageStats;
import org.opensearch.threadpool.ThreadPool;

import java.util.HashMap;
import java.util.Optional;

/**
* AverageIoUsageTracker tracks the IO usage by polling the FS Stats for IO metrics every (pollingInterval)
* and keeping track of the rolling average over a defined time window (windowDuration).
*/
public class AverageIoUsageTracker extends AbstractAverageUsageTracker {

private static final Logger LOGGER = LogManager.getLogger(AverageIoUsageTracker.class);
private final FsService fsService;
private final HashMap<String, Long> prevIoTimeDeviceMap;
private long prevTimeInMillis;
private IoUsageStats ioUsageStats;

public AverageIoUsageTracker(FsService fsService, ThreadPool threadPool, TimeValue pollingInterval, TimeValue windowDuration) {
super(threadPool, pollingInterval, windowDuration);
this.fsService = fsService;
this.prevIoTimeDeviceMap = new HashMap<>();
this.prevTimeInMillis = -1;
this.ioUsageStats = null;
}

/**
* Get current IO usage percentage calculated using fs stats
*/
@Override
public long getUsage() {
long usage = 0;
Optional<ValidationException> validationException = this.preValidateFsStats();
if (validationException != null && validationException.isPresent()) {
throw validationException.get();
}
// Currently even during the raid setup we have only one mount device and it is giving 0 io time from /proc/diskstats
DeviceStats[] devicesStats = fsService.stats().getIoStats().getDevicesStats();
long latestTimeInMillis = fsService.stats().getTimestamp();
for (DeviceStats devicesStat : devicesStats) {
long devicePreviousIoTime = prevIoTimeDeviceMap.getOrDefault(devicesStat.getDeviceName(), (long) -1);
long deviceCurrentIoTime = devicesStat.ioTimeInMillis();
if (prevTimeInMillis > 0 && (latestTimeInMillis - this.prevTimeInMillis > 0) && devicePreviousIoTime > 0) {
long absIoTime = (deviceCurrentIoTime - devicePreviousIoTime);
long deviceCurrentIoUsage = absIoTime * 100 / (latestTimeInMillis - this.prevTimeInMillis);
// We are returning the maximum IO Usage for all the attached devices
usage = Math.max(usage, deviceCurrentIoUsage);
}
prevIoTimeDeviceMap.put(devicesStat.getDeviceName(), devicesStat.ioTimeInMillis());
}
this.prevTimeInMillis = latestTimeInMillis;
return usage;
}

@Override
protected void doStart() {
if (Constants.LINUX) {
this.ioUsageStats = new IoUsageStats(-1);
scheduledFuture = threadPool.scheduleWithFixedDelay(() -> {
long usage = getUsage();
recordUsage(usage);
updateIoUsageStats();
}, pollingInterval, ThreadPool.Names.GENERIC);
}
}

public Optional<ValidationException> preValidateFsStats() {
ValidationException validationException = new ValidationException();
if (fsService == null
|| fsService.stats() == null
|| fsService.stats().getIoStats() == null
|| fsService.stats().getIoStats().getDevicesStats() == null) {
validationException.addValidationError("FSService IoStats Or DeviceStats are Missing");
}
return validationException.validationErrors().isEmpty() ? Optional.empty() : Optional.of(validationException);
}

private void updateIoUsageStats() {
this.ioUsageStats.setIoUtilisationPercent(this.isReady() ? this.getAverage() : -1);
}

public IoUsageStats getIoUsageStats() {
return this.ioUsageStats;
}
}
Loading

0 comments on commit 8a06c53

Please sign in to comment.