Skip to content

Commit

Permalink
refactor producer gauges
Browse files Browse the repository at this point in the history
  • Loading branch information
moscicky committed Jul 14, 2023
1 parent d3322ac commit a75f60e
Show file tree
Hide file tree
Showing 7 changed files with 155 additions and 198 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package pl.allegro.tech.hermes.common.metric;

import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tags;

import java.util.function.ToDoubleFunction;

public class HermesGauge {
private final MeterRegistry meterRegistry;
private final HermesMetrics hermesMetrics;

public HermesGauge(MeterRegistry meterRegistry, HermesMetrics hermesMetrics) {
this.meterRegistry = meterRegistry;
this.hermesMetrics = hermesMetrics;
}

public <T> void registerGauge(String graphiteName,
String prometheusName,
T stateObj,
ToDoubleFunction<T> f,
Tags tags) {
meterRegistry.gauge(prometheusName, tags, stateObj, f);
hermesMetrics.registerGauge(graphiteName, () -> f.applyAsDouble(stateObj));
}

public <T> void registerGauge(String graphiteName,
String prometheusName,
T stateObj,
ToDoubleFunction<T> f) {
registerGauge(graphiteName, prometheusName, stateObj, f, Tags.empty());
}

public <T> void registerGauge(String name,
T stateObj,
ToDoubleFunction<T> f) {
registerGauge(name, name, stateObj, f);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -172,20 +172,6 @@ public static void close(Timer.Context... timers) {
}
}

public double getBufferTotalBytes() {
return getDoubleValue(ACK_LEADER_BUFFER_TOTAL_BYTES)
+ getDoubleValue(ACK_ALL_BUFFER_TOTAL_BYTES);
}

public double getBufferAvailablesBytes() {
return getDoubleValue(ACK_LEADER_BUFFER_AVAILABLE_BYTES)
+ getDoubleValue(ACK_ALL_BUFFER_AVAILABLE_BYTES);
}

private double getDoubleValue(String gauge) {
return (double) metricRegistry.getGauges().get(pathCompiler.compile(gauge)).getValue();
}

private Counter getInflightCounter(SubscriptionName subscription) {
return counter(Counters.INFLIGHT, subscription.getTopicName(), subscription.getName());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package pl.allegro.tech.hermes.common.metric;

import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tags;

import java.util.Map;
import java.util.function.ToDoubleFunction;

import static pl.allegro.tech.hermes.common.metric.Gauges.BACKUP_STORAGE_SIZE;

public class PersistentBufferMetrics {
private final MeterRegistry meterRegistry;
Expand All @@ -14,8 +15,8 @@ public PersistentBufferMetrics(HermesMetrics hermesMetrics, MeterRegistry meterR
this.hermesMetrics = hermesMetrics;
}

public void registerBackupStorageSizeGauge(Map<?, ?> map) {
this.hermesMetrics.registerMessageRepositorySizeGauge(map::size);
this.meterRegistry.gaugeMapSize("backup-storage.size", Tags.empty(), map);
public <T> void registerBackupStorageSizeGauge(T obj, ToDoubleFunction<T> f) {
hermesMetrics.registerMessageRepositorySizeGauge(() -> (int) f.applyAsDouble(obj));
meterRegistry.gauge(BACKUP_STORAGE_SIZE, obj, f);
}
}
Original file line number Diff line number Diff line change
@@ -1,19 +1,9 @@
package pl.allegro.tech.hermes.common.metric;

import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tags;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;

import java.util.function.ToDoubleFunction;

import static pl.allegro.tech.hermes.common.metric.Gauges.ACK_ALL;
import static pl.allegro.tech.hermes.common.metric.Gauges.ACK_ALL_BUFFER_AVAILABLE_BYTES;
Expand All @@ -30,80 +20,66 @@
import static pl.allegro.tech.hermes.common.metric.Gauges.ACK_LEADER_METADATA_AGE;
import static pl.allegro.tech.hermes.common.metric.Gauges.ACK_LEADER_RECORD_QUEUE_TIME_MAX;
import static pl.allegro.tech.hermes.common.metric.Gauges.INFLIGHT_REQUESTS;

import static pl.allegro.tech.hermes.common.metric.HermesMetrics.escapeDots;

Check warning on line 24 in hermes-common/src/main/java/pl/allegro/tech/hermes/common/metric/ProducerMetrics.java

View workflow job for this annotation

GitHub Actions / checkstyle-hermes-common

[checkstyle-hermes-common] hermes-common/src/main/java/pl/allegro/tech/hermes/common/metric/ProducerMetrics.java#L24 <com.puppycrawl.tools.checkstyle.checks.imports.CustomImportOrderCheck>

Extra separation in import group before 'pl.allegro.tech.hermes.common.metric.HermesMetrics.escapeDots'
Raw output
/home/runner/work/hermes/hermes/hermes-common/src/main/java/pl/allegro/tech/hermes/common/metric/ProducerMetrics.java:24:1: warning: Extra separation in import group before 'pl.allegro.tech.hermes.common.metric.HermesMetrics.escapeDots' (com.puppycrawl.tools.checkstyle.checks.imports.CustomImportOrderCheck)

// exposes kafka producer metrics, see: https://docs.confluent.io/platform/current/kafka/monitoring.html#producer-metrics
public class ProducerMetrics {
private final HermesMetrics hermesMetrics;
private final MeterRegistry meterRegistry;
private final HermesGauge hermesGauge;

public ProducerMetrics(HermesMetrics hermesMetrics, MeterRegistry meterRegistry) {
this.hermesMetrics = hermesMetrics;
this.meterRegistry = meterRegistry;
this.hermesGauge = new HermesGauge(meterRegistry, hermesMetrics);
}

public void registerAckAllTotalBytesGauge(Producer<byte[], byte[]> producer) {
registerTotalBytesGauge(producer, ACK_ALL_BUFFER_TOTAL_BYTES);
public <T> void registerAckAllTotalBytesGauge(T stateObj, ToDoubleFunction<T> f) {
hermesGauge.registerGauge(ACK_ALL_BUFFER_TOTAL_BYTES, stateObj, f);
}

public void registerAckLeaderTotalBytesGauge(Producer<byte[], byte[]> producer) {
registerTotalBytesGauge(producer, ACK_LEADER_BUFFER_TOTAL_BYTES);
public <T> void registerAckLeaderTotalBytesGauge(T stateObj, ToDoubleFunction<T> f) {
hermesGauge.registerGauge(ACK_LEADER_BUFFER_TOTAL_BYTES, stateObj, f);
}

public void registerAckAllAvailableBytesGauge(Producer<byte[], byte[]> producer) {
registerAvailableBytesGauge(producer, ACK_ALL_BUFFER_AVAILABLE_BYTES);
public <T> void registerAckAllAvailableBytesGauge(T stateObj, ToDoubleFunction<T> f) {
hermesGauge.registerGauge(ACK_ALL_BUFFER_AVAILABLE_BYTES, stateObj, f);
}

public void registerAckLeaderAvailableBytesGauge(Producer<byte[], byte[]> producer) {
registerAvailableBytesGauge(producer, ACK_LEADER_BUFFER_AVAILABLE_BYTES);
public <T> void registerAckLeaderAvailableBytesGauge(T stateObj, ToDoubleFunction<T> f) {
hermesGauge.registerGauge(ACK_LEADER_BUFFER_AVAILABLE_BYTES, stateObj, f);
}

public void registerAckAllCompressionRateGauge(Producer<byte[], byte[]> producer) {
registerCompressionRateGauge(producer, ACK_ALL_COMPRESSION_RATE);
public <T> void registerAckAllCompressionRateGauge(T stateObj, ToDoubleFunction<T> f) {
hermesGauge.registerGauge(ACK_ALL_COMPRESSION_RATE, stateObj, f);
}

public void registerAckLeaderCompressionRateGauge(Producer<byte[], byte[]> producer) {
registerCompressionRateGauge(producer, ACK_LEADER_COMPRESSION_RATE);
public <T> void registerAckLeaderCompressionRateGauge(T stateObj, ToDoubleFunction<T> f) {
hermesGauge.registerGauge(ACK_LEADER_COMPRESSION_RATE, stateObj, f);
}

public void registerAckAllFailedBatchesGauge(Producer<byte[], byte[]> producer) {
registerFailedBatchesGauge(producer, ACK_ALL_FAILED_BATCHES_TOTAL);
public <T> void registerAckAllFailedBatchesGauge(T stateObj, ToDoubleFunction<T> f) {
hermesGauge.registerGauge(ACK_ALL_FAILED_BATCHES_TOTAL, stateObj, f);
}

public void registerAckLeaderFailedBatchesGauge(Producer<byte[], byte[]> producer) {
registerFailedBatchesGauge(producer, ACK_LEADER_FAILED_BATCHES_TOTAL);
public <T> void registerAckLeaderFailedBatchesGauge(T stateObj, ToDoubleFunction<T> f) {
hermesGauge.registerGauge(ACK_LEADER_FAILED_BATCHES_TOTAL, stateObj, f);
}

public void registerAckAllMetadataAgeGauge(Producer<byte[], byte[]> producer) {
registerMetadataAgeGauge(producer, ACK_ALL_METADATA_AGE);
public <T> void registerAckAllMetadataAgeGauge(T stateObj, ToDoubleFunction<T> f) {
hermesGauge.registerGauge(ACK_ALL_METADATA_AGE, stateObj, f);
}

public void registerAckLeaderMetadataAgeGauge(Producer<byte[], byte[]> producer) {
registerMetadataAgeGauge(producer, ACK_LEADER_METADATA_AGE);
public <T> void registerAckLeaderMetadataAgeGauge(T stateObj, ToDoubleFunction<T> f) {
hermesGauge.registerGauge(ACK_LEADER_METADATA_AGE, stateObj, f);
}

public void registerAckAllRecordQueueTimeMaxGauge(Producer<byte[], byte[]> producer) {
registerRecordQueueTimeMaxGauge(producer, ACK_ALL_RECORD_QUEUE_TIME_MAX);
public <T> void registerAckAllRecordQueueTimeMaxGauge(T stateObj, ToDoubleFunction<T> f) {
hermesGauge.registerGauge(ACK_ALL_RECORD_QUEUE_TIME_MAX, stateObj, f);
}

public void registerAckLeaderRecordQueueTimeMaxGauge(Producer<byte[], byte[]> producer) {
registerRecordQueueTimeMaxGauge(producer, ACK_LEADER_RECORD_QUEUE_TIME_MAX);
}

public void registerAckAllMaxLatencyPerBrokerGauge(Producer<byte[], byte[]> producer, List<Node> brokers) {
registerLatencyPerBrokerGauge(producer, "request-latency-max", ACK_ALL, brokers);
}

public void registerAckLeaderMaxLatencyPerBrokerGauge(Producer<byte[], byte[]> producer, List<Node> brokers) {
registerLatencyPerBrokerGauge(producer, "request-latency-max", ACK_LEADER, brokers);
}

public void registerAckAllAvgLatencyPerBrokerGauge(Producer<byte[], byte[]> producer, List<Node> brokers) {
registerLatencyPerBrokerGauge(producer, "request-latency-avg", ACK_ALL, brokers);
}

public void registerAckLeaderAvgLatencyPerBrokerGauge(Producer<byte[], byte[]> producer, List<Node> brokers) {
registerLatencyPerBrokerGauge(producer, "request-latency-avg", ACK_LEADER, brokers);
public <T> void registerAckLeaderRecordQueueTimeMaxGauge(T stateObj, ToDoubleFunction<T> f) {
hermesGauge.registerGauge(ACK_LEADER_RECORD_QUEUE_TIME_MAX, stateObj, f);
}

public double getBufferTotalBytes() {
Expand All @@ -116,125 +92,37 @@ public double getBufferAvailableBytes() {
+ meterRegistry.get(ACK_LEADER_BUFFER_AVAILABLE_BYTES).gauge().value();
}

public void registerProducerInflightRequestGauge(AtomicInteger atomicInteger) {
meterRegistry.gauge(INFLIGHT_REQUESTS, atomicInteger, AtomicInteger::get);
hermesMetrics.registerProducerInflightRequest(atomicInteger::get);
public <T> void registerProducerInflightRequestGauge(T stateObj, ToDoubleFunction<T> f) {
meterRegistry.gauge(INFLIGHT_REQUESTS, stateObj, f);
hermesMetrics.registerProducerInflightRequest(() -> (int) f.applyAsDouble(stateObj));
}

private void registerTotalBytesGauge(Producer<byte[], byte[]> producer, String gauge) {
registerProducerGauge(
producer,
new MetricName("buffer-total-bytes", "producer-metrics", "buffer total bytes", Collections.emptyMap()),
gauge
);
public <T> void registerAckAllMaxLatencyBrokerGauge(T stateObj, ToDoubleFunction<T> f, String brokerNodeId) {
registerLatencyPerBrokerGauge(stateObj, f, "request-latency-max", ACK_ALL, brokerNodeId);
}

private void registerAvailableBytesGauge(Producer<byte[], byte[]> producer, String gauge) {
registerProducerGauge(
producer,
new MetricName("buffer-available-bytes", "producer-metrics", "buffer available bytes", Collections.emptyMap()),
gauge
);
public <T> void registerAckLeaderMaxLatencyPerBrokerGauge(T stateObj, ToDoubleFunction<T> f, String brokerNodeId) {
registerLatencyPerBrokerGauge(stateObj, f, "request-latency-max", ACK_LEADER, brokerNodeId);
}

private void registerCompressionRateGauge(Producer<byte[], byte[]> producer, String gauge) {
registerProducerGauge(
producer,
new MetricName("compression-rate-avg", "producer-metrics", "average compression rate", Collections.emptyMap()),
gauge
);
public <T >void registerAckAllAvgLatencyPerBrokerGauge(T stateObj, ToDoubleFunction<T> f, String brokerNodeId) {

Check warning on line 108 in hermes-common/src/main/java/pl/allegro/tech/hermes/common/metric/ProducerMetrics.java

View workflow job for this annotation

GitHub Actions / checkstyle-hermes-common

[checkstyle-hermes-common] hermes-common/src/main/java/pl/allegro/tech/hermes/common/metric/ProducerMetrics.java#L108 <com.puppycrawl.tools.checkstyle.checks.whitespace.GenericWhitespaceCheck>

GenericWhitespace '>' is preceded with whitespace.
Raw output
/home/runner/work/hermes/hermes/hermes-common/src/main/java/pl/allegro/tech/hermes/common/metric/ProducerMetrics.java:108:15: warning: GenericWhitespace '>' is preceded with whitespace. (com.puppycrawl.tools.checkstyle.checks.whitespace.GenericWhitespaceCheck)

Check warning on line 108 in hermes-common/src/main/java/pl/allegro/tech/hermes/common/metric/ProducerMetrics.java

View workflow job for this annotation

GitHub Actions / checkstyle-hermes-common

[checkstyle-hermes-common] hermes-common/src/main/java/pl/allegro/tech/hermes/common/metric/ProducerMetrics.java#L108 <com.puppycrawl.tools.checkstyle.checks.whitespace.GenericWhitespaceCheck>

GenericWhitespace '>' should followed by whitespace.
Raw output
/home/runner/work/hermes/hermes/hermes-common/src/main/java/pl/allegro/tech/hermes/common/metric/ProducerMetrics.java:108:15: warning: GenericWhitespace '>' should followed by whitespace. (com.puppycrawl.tools.checkstyle.checks.whitespace.GenericWhitespaceCheck)
registerLatencyPerBrokerGauge(stateObj, f, "request-latency-avg", ACK_ALL, brokerNodeId);
}

private void registerFailedBatchesGauge(Producer<byte[], byte[]> producer, String gauge) {
registerProducerGauge(
producer,
new MetricName("record-error-total", "producer-metrics", "failed publishing batches", Collections.emptyMap()),
gauge
);
public <T> void registerAckLeaderAvgLatencyPerBrokerGauge(T stateObj, ToDoubleFunction<T> f, String brokerNodeId) {
registerLatencyPerBrokerGauge(stateObj, f,"request-latency-avg", ACK_LEADER, brokerNodeId);

Check warning on line 113 in hermes-common/src/main/java/pl/allegro/tech/hermes/common/metric/ProducerMetrics.java

View workflow job for this annotation

GitHub Actions / checkstyle-hermes-common

[checkstyle-hermes-common] hermes-common/src/main/java/pl/allegro/tech/hermes/common/metric/ProducerMetrics.java#L113 <com.puppycrawl.tools.checkstyle.checks.whitespace.WhitespaceAfterCheck>

',' is not followed by whitespace.
Raw output
/home/runner/work/hermes/hermes/hermes-common/src/main/java/pl/allegro/tech/hermes/common/metric/ProducerMetrics.java:113:50: warning: ',' is not followed by whitespace. (com.puppycrawl.tools.checkstyle.checks.whitespace.WhitespaceAfterCheck)
}

private void registerMetadataAgeGauge(Producer<byte[], byte[]> producer, String gauge) {
registerProducerGauge(
producer,
new MetricName("metadata-age", "producer-metrics", "age [s] of metadata", Collections.emptyMap()),
gauge
);
}
private <T> void registerLatencyPerBrokerGauge(T stateObj,
ToDoubleFunction<T> f,
String metricName,
String producerName,
String brokerNodeId) {
String baseMetricName = Gauges.KAFKA_PRODUCER + producerName + metricName;
String graphiteMetricName = baseMetricName + "." + escapeDots(brokerNodeId);

private void registerRecordQueueTimeMaxGauge(Producer<byte[], byte[]> producer, String gauge) {
registerProducerGauge(
producer,
new MetricName(
"record-queue-time-max",
"producer-metrics",
"maximum time [ms] that batch spent in the send buffer",
Collections.emptyMap()),
gauge
hermesGauge.registerGauge(
graphiteMetricName, baseMetricName, stateObj, f, Tags.of("broker", brokerNodeId)
);
}

private void registerLatencyPerBrokerGauge(Producer<byte[], byte[]> producer,
String metricName,
String producerName,
List<Node> brokers) {
for (Node broker : brokers) {
registerLatencyPerBrokerGauge(producer, metricName, producerName, broker);
}
}

private void registerLatencyPerBrokerGauge(Producer<byte[], byte[]> producer,
String metricName,
String producerName,
Node node) {
String baseMetricName = Gauges.KAFKA_PRODUCER + producerName + "." + metricName;
String graphiteMetricName = baseMetricName + "." + escapeDots(node.host());

Predicate<Map.Entry<MetricName, ? extends Metric>> predicate = entry -> entry.getKey().group().equals("producer-node-metrics")
&& entry.getKey().name().equals(metricName)
&& entry.getKey().tags().containsValue("node-" + node.id());

registerProducerGauge(producer, baseMetricName, graphiteMetricName, predicate, Tags.of("broker", node.host()));
}


private void registerProducerGauge(final Producer<byte[], byte[]> producer,
final MetricName producerMetricName,
final String gauge) {
Predicate<Map.Entry<MetricName, ? extends Metric>> predicate = entry -> entry.getKey().group().equals(producerMetricName.group())
&& entry.getKey().name().equals(producerMetricName.name());
registerProducerGauge(producer, gauge, gauge, predicate, Tags.empty());
}

private void registerProducerGauge(Producer<byte[], byte[]> producer,
String prometheusMetricName,
String graphiteMetricName,
Predicate<Map.Entry<MetricName, ? extends Metric>> predicate,
Tags tags) {
registerProducerGaugePrometheus(producer, prometheusMetricName, predicate, tags);
registerProducerGaugeGraphite(producer, graphiteMetricName, predicate);
}

private double findProducerMetric(Producer<byte[], byte[]> producer,
Predicate<Map.Entry<MetricName, ? extends Metric>> predicate) {
Optional<? extends Map.Entry<MetricName, ? extends Metric>> first =
producer.metrics().entrySet().stream().filter(predicate).findFirst();
double value = first.map(metricNameEntry -> metricNameEntry.getValue().value()).orElse(0.0);
return value < 0 ? 0.0 : value;
}

private void registerProducerGaugePrometheus(Producer<byte[], byte[]> producer,
String gauge,
Predicate<Map.Entry<MetricName, ? extends Metric>> predicate,
Tags tags) {
Gauge.builder(gauge, producer, p -> findProducerMetric(p, predicate))
.tags(tags)
.register(meterRegistry);
}

private void registerProducerGaugeGraphite(Producer<byte[], byte[]> producer,
String gauge,
Predicate<Map.Entry<MetricName, ? extends Metric>> predicate) {
hermesMetrics.registerGauge(gauge, () -> findProducerMetric(producer, predicate));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
Expand Down Expand Up @@ -52,7 +53,7 @@ public ChronicleMapMessageRepository(File file, int entries, int averageMessageS

public ChronicleMapMessageRepository(File file, int entries, int averageMessageSize, MetricsFacade metricsFacade) {
this(file, entries, averageMessageSize);
metricsFacade.persistentBufferMetrics().registerBackupStorageSizeGauge(map);
metricsFacade.persistentBufferMetrics().registerBackupStorageSizeGauge(map, Map::size);
}

@Override
Expand Down
Loading

0 comments on commit a75f60e

Please sign in to comment.