Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hermes frontend prometheus support #1692

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ static HermesServer provideHermesServer() throws IOException {
ThroughputLimiter throughputLimiter = (exampleTopic, throughput) -> quotaConfirmed();
HermesMetrics hermesMetrics = new HermesMetrics(new MetricRegistry(), new PathsCompiler(""));
MetricsFacade metricsFacade = new MetricsFacade(new SimpleMeterRegistry(), hermesMetrics);
TopicsCache topicsCache = new InMemoryTopicsCache(hermesMetrics, metricsFacade, topic);
TopicsCache topicsCache = new InMemoryTopicsCache(metricsFacade, topic);
BrokerMessageProducer brokerMessageProducer = new InMemoryBrokerMessageProducer();
RawSchemaClient rawSchemaClient = new InMemorySchemaClient(topic.getName(), loadMessageResource("schema"), 1, 1);
Trackers trackers = new Trackers(Collections.emptyList());
Expand All @@ -67,7 +67,7 @@ static HermesServer provideHermesServer() throws IOException {
return new HermesServer(
sslProperties,
hermesServerProperties,
hermesMetrics,
metricsFacade,
httpHandler,
new DisabledReadinessChecker(false),
new NoOpMessagePreviewPersister(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,12 @@

class InMemoryTopicsCache implements TopicsCache {

private final HermesMetrics oldMetrics;
private final MetricsFacade metricsFacade;
private final KafkaTopics kafkaTopics;
private final Topic topic;


InMemoryTopicsCache(HermesMetrics oldMetrics, MetricsFacade metricsFacade, Topic topic) {
this.oldMetrics = oldMetrics;
InMemoryTopicsCache(MetricsFacade metricsFacade, Topic topic) {
this.metricsFacade = metricsFacade;
this.topic = topic;
this.kafkaTopics = new KafkaTopics(new KafkaTopic(KafkaTopicName.valueOf(topic.getQualifiedName()), topic.getContentType()));
Expand All @@ -33,7 +31,6 @@ public Optional<CachedTopic> getTopic(String qualifiedTopicName) {
return Optional.of(
new CachedTopic(
topic,
oldMetrics,
metricsFacade,
kafkaTopics
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public class Gauges {

public static final String ACK_ALL_BUFFER_TOTAL_BYTES = KAFKA_PRODUCER + ACK_ALL + "buffer-total-bytes";
public static final String ACK_ALL_BUFFER_AVAILABLE_BYTES = KAFKA_PRODUCER + ACK_ALL + "buffer-available-bytes";
public static final String ACK_ALL_CONFIRMS_METADATA_AGE = KAFKA_PRODUCER + ACK_ALL + "metadata-age";
public static final String ACK_ALL_METADATA_AGE = KAFKA_PRODUCER + ACK_ALL + "metadata-age";
public static final String ACK_ALL_RECORD_QUEUE_TIME_MAX = KAFKA_PRODUCER + ACK_ALL + "record-queue-time-max";
public static final String ACK_ALL_COMPRESSION_RATE = KAFKA_PRODUCER + ACK_ALL + "compression-rate-avg";
public static final String ACK_ALL_FAILED_BATCHES_TOTAL = KAFKA_PRODUCER + ACK_ALL + "failed-batches-total";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,16 @@ public class MetricsFacade {
private final TopicMetrics topicMetrics;
private final SubscriptionMetrics subscriptionMetrics;
private final ConsumerMetrics consumerMetrics;
private final PersistentBufferMetrics persistentBufferMetrics;
private final ProducerMetrics producerMetrics;

public MetricsFacade(MeterRegistry meterRegistry,
HermesMetrics hermesMetrics) {
this.topicMetrics = new TopicMetrics(hermesMetrics, meterRegistry);
this.subscriptionMetrics = new SubscriptionMetrics(hermesMetrics, meterRegistry);
this.consumerMetrics = new ConsumerMetrics(hermesMetrics, meterRegistry);
this.persistentBufferMetrics = new PersistentBufferMetrics(hermesMetrics, meterRegistry);
this.producerMetrics = new ProducerMetrics(hermesMetrics, meterRegistry);
}

public TopicMetrics topics() {
Expand All @@ -26,5 +30,13 @@ public SubscriptionMetrics subscriptionMetrics() {
public ConsumerMetrics consumers() {
return consumerMetrics;
}

public PersistentBufferMetrics persistentBufferMetrics() {
return persistentBufferMetrics;
}

public ProducerMetrics producerMetrics() {
return producerMetrics;
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package pl.allegro.tech.hermes.common.metric;

import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tags;

import java.util.Map;

public class PersistentBufferMetrics {
private final MeterRegistry meterRegistry;
private final HermesMetrics hermesMetrics;

public PersistentBufferMetrics(HermesMetrics hermesMetrics, MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
this.hermesMetrics = hermesMetrics;
}

public void registerBackupStorageSizeGauge(Map<?, ?> map) {
this.hermesMetrics.registerMessageRepositorySizeGauge(map::size);
this.meterRegistry.gaugeMapSize("backup-storage.size", Tags.empty(), map);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,240 @@
package pl.allegro.tech.hermes.common.metric;

import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tags;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;

import static pl.allegro.tech.hermes.common.metric.Gauges.ACK_ALL;
import static pl.allegro.tech.hermes.common.metric.Gauges.ACK_ALL_BUFFER_AVAILABLE_BYTES;
import static pl.allegro.tech.hermes.common.metric.Gauges.ACK_ALL_BUFFER_TOTAL_BYTES;
import static pl.allegro.tech.hermes.common.metric.Gauges.ACK_ALL_COMPRESSION_RATE;
import static pl.allegro.tech.hermes.common.metric.Gauges.ACK_ALL_FAILED_BATCHES_TOTAL;
import static pl.allegro.tech.hermes.common.metric.Gauges.ACK_ALL_METADATA_AGE;
import static pl.allegro.tech.hermes.common.metric.Gauges.ACK_ALL_RECORD_QUEUE_TIME_MAX;
import static pl.allegro.tech.hermes.common.metric.Gauges.ACK_LEADER;
import static pl.allegro.tech.hermes.common.metric.Gauges.ACK_LEADER_BUFFER_AVAILABLE_BYTES;
import static pl.allegro.tech.hermes.common.metric.Gauges.ACK_LEADER_BUFFER_TOTAL_BYTES;
import static pl.allegro.tech.hermes.common.metric.Gauges.ACK_LEADER_COMPRESSION_RATE;
import static pl.allegro.tech.hermes.common.metric.Gauges.ACK_LEADER_FAILED_BATCHES_TOTAL;
import static pl.allegro.tech.hermes.common.metric.Gauges.ACK_LEADER_METADATA_AGE;
import static pl.allegro.tech.hermes.common.metric.Gauges.ACK_LEADER_RECORD_QUEUE_TIME_MAX;
import static pl.allegro.tech.hermes.common.metric.Gauges.INFLIGHT_REQUESTS;
import static pl.allegro.tech.hermes.common.metric.HermesMetrics.escapeDots;

// exposes kafka producer metrics, see: https://docs.confluent.io/platform/current/kafka/monitoring.html#producer-metrics
public class ProducerMetrics {
private final HermesMetrics hermesMetrics;
private final MeterRegistry meterRegistry;

public ProducerMetrics(HermesMetrics hermesMetrics, MeterRegistry meterRegistry) {
this.hermesMetrics = hermesMetrics;
this.meterRegistry = meterRegistry;
}

public void registerAckAllTotalBytesGauge(Producer<byte[], byte[]> producer) {
registerTotalBytesGauge(producer, ACK_ALL_BUFFER_TOTAL_BYTES);
}

public void registerAckLeaderTotalBytesGauge(Producer<byte[], byte[]> producer) {
registerTotalBytesGauge(producer, ACK_LEADER_BUFFER_TOTAL_BYTES);
}

public void registerAckAllAvailableBytesGauge(Producer<byte[], byte[]> producer) {
registerAvailableBytesGauge(producer, ACK_ALL_BUFFER_AVAILABLE_BYTES);
}

public void registerAckLeaderAvailableBytesGauge(Producer<byte[], byte[]> producer) {
registerAvailableBytesGauge(producer, ACK_LEADER_BUFFER_AVAILABLE_BYTES);
}

public void registerAckAllCompressionRateGauge(Producer<byte[], byte[]> producer) {
registerCompressionRateGauge(producer, ACK_ALL_COMPRESSION_RATE);
}

public void registerAckLeaderCompressionRateGauge(Producer<byte[], byte[]> producer) {
registerCompressionRateGauge(producer, ACK_LEADER_COMPRESSION_RATE);
}

public void registerAckAllFailedBatchesGauge(Producer<byte[], byte[]> producer) {
registerFailedBatchesGauge(producer, ACK_ALL_FAILED_BATCHES_TOTAL);
}

public void registerAckLeaderFailedBatchesGauge(Producer<byte[], byte[]> producer) {
registerFailedBatchesGauge(producer, ACK_LEADER_FAILED_BATCHES_TOTAL);
}

public void registerAckAllMetadataAgeGauge(Producer<byte[], byte[]> producer) {
registerMetadataAgeGauge(producer, ACK_ALL_METADATA_AGE);
}

public void registerAckLeaderMetadataAgeGauge(Producer<byte[], byte[]> producer) {
registerMetadataAgeGauge(producer, ACK_LEADER_METADATA_AGE);
}

public void registerAckAllRecordQueueTimeMaxGauge(Producer<byte[], byte[]> producer) {
registerRecordQueueTimeMaxGauge(producer, ACK_ALL_RECORD_QUEUE_TIME_MAX);
}

public void registerAckLeaderRecordQueueTimeMaxGauge(Producer<byte[], byte[]> producer) {
registerRecordQueueTimeMaxGauge(producer, ACK_LEADER_RECORD_QUEUE_TIME_MAX);
}

public void registerAckAllMaxLatencyPerBrokerGauge(Producer<byte[], byte[]> producer, List<Node> brokers) {
registerLatencyPerBrokerGauge(producer, "request-latency-max", ACK_ALL, brokers);
}

public void registerAckLeaderMaxLatencyPerBrokerGauge(Producer<byte[], byte[]> producer, List<Node> brokers) {
registerLatencyPerBrokerGauge(producer, "request-latency-max", ACK_LEADER, brokers);
}

public void registerAckAllAvgLatencyPerBrokerGauge(Producer<byte[], byte[]> producer, List<Node> brokers) {
registerLatencyPerBrokerGauge(producer, "request-latency-avg", ACK_ALL, brokers);
}

public void registerAckLeaderAvgLatencyPerBrokerGauge(Producer<byte[], byte[]> producer, List<Node> brokers) {
registerLatencyPerBrokerGauge(producer, "request-latency-avg", ACK_LEADER, brokers);
}

public double getBufferTotalBytes() {
return meterRegistry.get(ACK_ALL_BUFFER_TOTAL_BYTES).gauge().value()
+ meterRegistry.get(ACK_LEADER_BUFFER_TOTAL_BYTES).gauge().value();
}

public double getBufferAvailableBytes() {
return meterRegistry.get(ACK_ALL_BUFFER_AVAILABLE_BYTES).gauge().value()
+ meterRegistry.get(ACK_LEADER_BUFFER_AVAILABLE_BYTES).gauge().value();
}

public void registerProducerInflightRequestGauge(AtomicInteger atomicInteger) {
meterRegistry.gauge(INFLIGHT_REQUESTS, atomicInteger, AtomicInteger::get);
hermesMetrics.registerProducerInflightRequest(atomicInteger::get);
}

private void registerTotalBytesGauge(Producer<byte[], byte[]> producer, String gauge) {
registerProducerGauge(
producer,
new MetricName("buffer-total-bytes", "producer-metrics", "buffer total bytes", Collections.emptyMap()),
gauge
);
}

private void registerAvailableBytesGauge(Producer<byte[], byte[]> producer, String gauge) {
registerProducerGauge(
producer,
new MetricName("buffer-available-bytes", "producer-metrics", "buffer available bytes", Collections.emptyMap()),
gauge
);
}

private void registerCompressionRateGauge(Producer<byte[], byte[]> producer, String gauge) {
registerProducerGauge(
producer,
new MetricName("compression-rate-avg", "producer-metrics", "average compression rate", Collections.emptyMap()),
gauge
);
}

private void registerFailedBatchesGauge(Producer<byte[], byte[]> producer, String gauge) {
registerProducerGauge(
producer,
new MetricName("record-error-total", "producer-metrics", "failed publishing batches", Collections.emptyMap()),
gauge
);
}

private void registerMetadataAgeGauge(Producer<byte[], byte[]> producer, String gauge) {
registerProducerGauge(
producer,
new MetricName("metadata-age", "producer-metrics", "age [s] of metadata", Collections.emptyMap()),
gauge
);
}

private void registerRecordQueueTimeMaxGauge(Producer<byte[], byte[]> producer, String gauge) {
registerProducerGauge(
producer,
new MetricName(
"record-queue-time-max",
"producer-metrics",
"maximum time [ms] that batch spent in the send buffer",
Collections.emptyMap()),
gauge
);
}

private void registerLatencyPerBrokerGauge(Producer<byte[], byte[]> producer,
String metricName,
String producerName,
List<Node> brokers) {
for (Node broker : brokers) {
registerLatencyPerBrokerGauge(producer, metricName, producerName, broker);
}
}

private void registerLatencyPerBrokerGauge(Producer<byte[], byte[]> producer,
String metricName,
String producerName,
Node node) {
String baseMetricName = Gauges.KAFKA_PRODUCER + producerName + "." + metricName;
String graphiteMetricName = baseMetricName + "." + escapeDots(node.host());

Predicate<Map.Entry<MetricName, ? extends Metric>> predicate = entry -> entry.getKey().group().equals("producer-node-metrics")
&& entry.getKey().name().equals(metricName)
&& entry.getKey().tags().containsValue("node-" + node.id());

registerProducerGauge(producer, baseMetricName, graphiteMetricName, predicate, Tags.of("broker", node.host()));
}


private void registerProducerGauge(final Producer<byte[], byte[]> producer,
final MetricName producerMetricName,
final String gauge) {
Predicate<Map.Entry<MetricName, ? extends Metric>> predicate = entry -> entry.getKey().group().equals(producerMetricName.group())
&& entry.getKey().name().equals(producerMetricName.name());
registerProducerGauge(producer, gauge, gauge, predicate, Tags.empty());
}

private void registerProducerGauge(Producer<byte[], byte[]> producer,
String prometheusMetricName,
String graphiteMetricName,
Predicate<Map.Entry<MetricName, ? extends Metric>> predicate,
Tags tags) {
registerProducerGaugePrometheus(producer, prometheusMetricName, predicate, tags);
registerProducerGaugeGraphite(producer, graphiteMetricName, predicate);
}

private double findProducerMetric(Producer<byte[], byte[]> producer,
Predicate<Map.Entry<MetricName, ? extends Metric>> predicate) {
Optional<? extends Map.Entry<MetricName, ? extends Metric>> first =
producer.metrics().entrySet().stream().filter(predicate).findFirst();
double value = first.map(metricNameEntry -> metricNameEntry.getValue().value()).orElse(0.0);
return value < 0 ? 0.0 : value;
}

private void registerProducerGaugePrometheus(Producer<byte[], byte[]> producer,
String gauge,
Predicate<Map.Entry<MetricName, ? extends Metric>> predicate,
Tags tags) {
Gauge.builder(gauge, producer, p -> findProducerMetric(p, predicate))
.tags(tags)
.register(meterRegistry);
}

private void registerProducerGaugeGraphite(Producer<byte[], byte[]> producer,
String gauge,
Predicate<Map.Entry<MetricName, ? extends Metric>> predicate) {
hermesMetrics.registerGauge(gauge, () -> findProducerMetric(producer, predicate));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
import com.codahale.metrics.Meter;
import io.micrometer.core.instrument.Counter;
import pl.allegro.tech.hermes.api.SubscriptionName;
import pl.allegro.tech.hermes.metrics.HermesCounter;
import pl.allegro.tech.hermes.metrics.counters.DefaultHermesCounterWithRate;

public class SubscriptionHermesCounter extends HermesCounter {
public class SubscriptionHermesCounter extends DefaultHermesCounterWithRate {

private final String graphiteName;
private final SubscriptionName subscription;
Expand Down
Loading
Loading