Skip to content

Commit

Permalink
Fix coordination request and document rejection metrics (elastic#107915)
Browse files Browse the repository at this point in the history
* Fix coordination request rejection ratio metric

* change test

* fix metrics

* minor tuning

* Publish plain counters, do aggregation in kibana
  • Loading branch information
volodk85 authored May 7, 2024
1 parent 0117ea1 commit 846b74f
Show file tree
Hide file tree
Showing 7 changed files with 327 additions and 143 deletions.

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions server/src/main/java/org/elasticsearch/TransportVersions.java
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,8 @@ static TransportVersion def(int id) {
public static final TransportVersion ROLE_REMOTE_CLUSTER_PRIVS = def(8_649_00_0);
public static final TransportVersion NO_GLOBAL_RETENTION_FOR_SYSTEM_DATA_STREAMS = def(8_650_00_0);
public static final TransportVersion SHUTDOWN_REQUEST_TIMEOUTS_FIX = def(8_651_00_0);
public static final TransportVersion INDEXING_PRESSURE_REQUEST_REJECTIONS_COUNT = def(8_652_00_0);

/*
* STOP! READ THIS FIRST! No, really,
* ____ _____ ___ ____ _ ____ _____ _ ____ _____ _ _ ___ ____ _____ ___ ____ ____ _____ _
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -786,6 +786,7 @@ static class IndexPressureStats implements ToXContentFragment {
long memoryLimit = 0;

long totalCoordinatingOps = 0;
long totalCoordinatingRequests = 0;
long totalPrimaryOps = 0;
long totalReplicaOps = 0;
long currentCoordinatingOps = 0;
Expand Down Expand Up @@ -813,6 +814,7 @@ static class IndexPressureStats implements ToXContentFragment {
currentPrimaryOps += nodeStatIndexingPressureStats.getCurrentPrimaryOps();
currentReplicaOps += nodeStatIndexingPressureStats.getCurrentReplicaOps();
primaryDocumentRejections += nodeStatIndexingPressureStats.getPrimaryDocumentRejections();
totalCoordinatingRequests += nodeStatIndexingPressureStats.getTotalCoordinatingRequests();
}
}
indexingPressureStats = new IndexingPressureStats(
Expand All @@ -834,7 +836,8 @@ static class IndexPressureStats implements ToXContentFragment {
currentCoordinatingOps,
currentPrimaryOps,
currentReplicaOps,
primaryDocumentRejections
primaryDocumentRejections,
totalCoordinatingRequests
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public class IndexingPressure {
private final AtomicLong totalReplicaBytes = new AtomicLong(0);

private final AtomicLong totalCoordinatingOps = new AtomicLong(0);
private final AtomicLong totalCoordinatingRequests = new AtomicLong(0);
private final AtomicLong totalPrimaryOps = new AtomicLong(0);
private final AtomicLong totalReplicaOps = new AtomicLong(0);

Expand Down Expand Up @@ -109,6 +110,7 @@ public Releasable markCoordinatingOperationStarted(int operations, long bytes, b
totalCombinedCoordinatingAndPrimaryBytes.getAndAdd(bytes);
totalCoordinatingBytes.getAndAdd(bytes);
totalCoordinatingOps.getAndAdd(operations);
totalCoordinatingRequests.getAndIncrement();
return wrapReleasable(() -> {
logger.trace(() -> Strings.format("removing [%d] coordinating operations and [%d] bytes", operations, bytes));
this.currentCombinedCoordinatingAndPrimaryBytes.getAndAdd(-bytes);
Expand Down Expand Up @@ -221,7 +223,8 @@ public IndexingPressureStats stats() {
currentCoordinatingOps.get(),
currentPrimaryOps.get(),
currentReplicaOps.get(),
primaryDocumentRejections.get()
primaryDocumentRejections.get(),
totalCoordinatingRequests.get()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class IndexingPressureStats implements Writeable, ToXContentFragment {

// These fields will be used for additional back-pressure and metrics in the future
private final long totalCoordinatingOps;
private final long totalCoordinatingRequests;
private final long totalPrimaryOps;
private final long totalReplicaOps;
private final long currentCoordinatingOps;
Expand Down Expand Up @@ -77,6 +78,12 @@ public IndexingPressureStats(StreamInput in) throws IOException {
} else {
primaryDocumentRejections = -1L;
}

if (in.getTransportVersion().onOrAfter(TransportVersions.INDEXING_PRESSURE_REQUEST_REJECTIONS_COUNT)) {
totalCoordinatingRequests = in.readVLong();
} else {
totalCoordinatingRequests = -1L;
}
}

public IndexingPressureStats(
Expand All @@ -98,7 +105,8 @@ public IndexingPressureStats(
long currentCoordinatingOps,
long currentPrimaryOps,
long currentReplicaOps,
long primaryDocumentRejections
long primaryDocumentRejections,
long totalCoordinatingRequests
) {
this.totalCombinedCoordinatingAndPrimaryBytes = totalCombinedCoordinatingAndPrimaryBytes;
this.totalCoordinatingBytes = totalCoordinatingBytes;
Expand All @@ -121,6 +129,7 @@ public IndexingPressureStats(
this.currentReplicaOps = currentReplicaOps;

this.primaryDocumentRejections = primaryDocumentRejections;
this.totalCoordinatingRequests = totalCoordinatingRequests;
}

@Override
Expand All @@ -146,6 +155,10 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getTransportVersion().onOrAfter(TransportVersions.INDEXING_PRESSURE_DOCUMENT_REJECTIONS_COUNT)) {
out.writeVLong(primaryDocumentRejections);
}

if (out.getTransportVersion().onOrAfter(TransportVersions.INDEXING_PRESSURE_REQUEST_REJECTIONS_COUNT)) {
out.writeVLong(totalCoordinatingRequests);
}
}

public long getTotalCombinedCoordinatingAndPrimaryBytes() {
Expand Down Expand Up @@ -224,6 +237,10 @@ public long getPrimaryDocumentRejections() {
return primaryDocumentRejections;
}

public long getTotalCoordinatingRequests() {
return totalCoordinatingRequests;
}

private static final String COMBINED = "combined_coordinating_and_primary";
private static final String COMBINED_IN_BYTES = "combined_coordinating_and_primary_in_bytes";
private static final String COORDINATING = "coordinating";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.elasticsearch.monitor.jvm.GcNames;
import org.elasticsearch.monitor.jvm.JvmStats;
import org.elasticsearch.node.NodeService;
import org.elasticsearch.telemetry.metric.DoubleWithAttributes;
import org.elasticsearch.telemetry.metric.LongWithAttributes;
import org.elasticsearch.telemetry.metric.MeterRegistry;

Expand Down Expand Up @@ -529,23 +528,16 @@ private void registerAsyncMetrics(MeterRegistry registry) {
);

metrics.add(
registry.registerDoubleGauge(
"es.indexing.coordinating_operations.rejections.ratio",
"Ratio of rejected coordinating operations",
"ratio",
() -> {
var totalCoordinatingOperations = Optional.ofNullable(stats.getOrRefresh())
.map(NodeStats::getIndexingPressureStats)
.map(IndexingPressureStats::getTotalCoordinatingOps)
.orElse(0L);
var totalCoordinatingRejections = Optional.ofNullable(stats.getOrRefresh())
registry.registerLongAsyncCounter(
"es.indexing.coordinating_operations.requests.total",
"Total number of coordinating requests",
"operations",
() -> new LongWithAttributes(
Optional.ofNullable(stats.getOrRefresh())
.map(NodeStats::getIndexingPressureStats)
.map(IndexingPressureStats::getCoordinatingRejections)
.orElse(0L);
// rejections do not count towards `totalCoordinatingOperations`
var totalOps = totalCoordinatingOperations + totalCoordinatingRejections;
return new DoubleWithAttributes(totalOps != 0 ? (double) totalCoordinatingRejections / totalOps : 0.0);
}
.map(IndexingPressureStats::getTotalCoordinatingRequests)
.orElse(0L)
)
)
);

Expand Down Expand Up @@ -620,23 +612,16 @@ private void registerAsyncMetrics(MeterRegistry registry) {
);

metrics.add(
registry.registerDoubleGauge(
"es.indexing.primary_operations.document.rejections.ratio",
"Ratio of rejected primary operations",
"ratio",
() -> {
var totalPrimaryOperations = Optional.ofNullable(stats.getOrRefresh())
.map(NodeStats::getIndexingPressureStats)
.map(IndexingPressureStats::getTotalPrimaryOps)
.orElse(0L);
var totalPrimaryDocumentRejections = Optional.ofNullable(stats.getOrRefresh())
registry.registerLongAsyncCounter(
"es.indexing.primary_operations.document.rejections.total",
"Total number of rejected indexing documents",
"operations",
() -> new LongWithAttributes(
Optional.ofNullable(stats.getOrRefresh())
.map(NodeStats::getIndexingPressureStats)
.map(IndexingPressureStats::getPrimaryDocumentRejections)
.orElse(0L);
// primary document rejections do not count towards `totalPrimaryOperations`
var totalOps = totalPrimaryOperations + totalPrimaryDocumentRejections;
return new DoubleWithAttributes(totalOps != 0 ? (double) totalPrimaryDocumentRejections / totalOps : 0.0);
}
.orElse(0L)
)
)
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1039,6 +1039,7 @@ public static NodeStats createNodeStats() {
randomLongBetween(0, maxStatValue),
randomLongBetween(0, maxStatValue),
randomLongBetween(0, maxStatValue),
randomLongBetween(0, maxStatValue),
randomLongBetween(0, maxStatValue)
);
}
Expand Down

0 comments on commit 846b74f

Please sign in to comment.