Skip to content

Commit

Permalink
checkstyle
Browse files Browse the repository at this point in the history
  • Loading branch information
moscicky committed Jul 13, 2023
1 parent e983573 commit 5555c7f
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,21 @@ public void registerAckLeaderAvgLatencyPerBrokerGauge(Producer<byte[], byte[]> p
registerLatencyPerBrokerGauge(producer, "request-latency-avg", ACK_LEADER, brokers);
}

public double getBufferTotalBytes() {
return meterRegistry.get(ACK_ALL_BUFFER_TOTAL_BYTES).gauge().value()
+ meterRegistry.get(ACK_LEADER_BUFFER_TOTAL_BYTES).gauge().value();
}

public double getBufferAvailableBytes() {
return meterRegistry.get(ACK_ALL_BUFFER_AVAILABLE_BYTES).gauge().value()
+ meterRegistry.get(ACK_LEADER_BUFFER_AVAILABLE_BYTES).gauge().value();
}

public void registerProducerInflightRequestGauge(AtomicInteger atomicInteger) {
meterRegistry.gauge(INFLIGHT_REQUESTS, atomicInteger, AtomicInteger::get);
hermesMetrics.registerProducerInflightRequest(atomicInteger::get);
}

private void registerTotalBytesGauge(Producer<byte[], byte[]> producer, String gauge) {
registerProducerGauge(
producer,
Expand Down Expand Up @@ -146,7 +161,7 @@ private void registerMetadataAgeGauge(Producer<byte[], byte[]> producer, String
);
}

public void registerRecordQueueTimeMaxGauge(Producer<byte[], byte[]> producer, String gauge) {
private void registerRecordQueueTimeMaxGauge(Producer<byte[], byte[]> producer, String gauge) {
registerProducerGauge(
producer,
new MetricName(
Expand All @@ -158,21 +173,6 @@ public void registerRecordQueueTimeMaxGauge(Producer<byte[], byte[]> producer, S
);
}

public double getBufferTotalBytes() {
return meterRegistry.get(ACK_ALL_BUFFER_TOTAL_BYTES).gauge().value()
+ meterRegistry.get(ACK_LEADER_BUFFER_TOTAL_BYTES).gauge().value();
}

public double getBufferAvailableBytes() {
return meterRegistry.get(ACK_ALL_BUFFER_AVAILABLE_BYTES).gauge().value()
+ meterRegistry.get(ACK_LEADER_BUFFER_AVAILABLE_BYTES).gauge().value();
}

public void registerProducerInflightRequestGauge(AtomicInteger atomicInteger) {
meterRegistry.gauge(INFLIGHT_REQUESTS, atomicInteger, AtomicInteger::get);
hermesMetrics.registerProducerInflightRequest(atomicInteger::get);
}

private void registerLatencyPerBrokerGauge(Producer<byte[], byte[]> producer,
String metricName,
String producerName,
Expand All @@ -186,39 +186,55 @@ private void registerLatencyPerBrokerGauge(Producer<byte[], byte[]> producer,
String metricName,
String producerName,
Node node) {
String gauge = Gauges.KAFKA_PRODUCER + producerName + "." + metricName + "." + escapeDots(node.host());
String baseMetricName = Gauges.KAFKA_PRODUCER + producerName + "." + metricName;
String graphiteMetricName = baseMetricName + "." + escapeDots(node.host());

Predicate<Map.Entry<MetricName, ? extends Metric>> predicate = entry -> entry.getKey().group().equals("producer-node-metrics")
&& entry.getKey().name().equals(metricName)
&& entry.getKey().tags().containsValue("node-" + node.id());
registerProducerGaugeGraphite(producer, gauge, predicate);
registerProducerGaugePrometheus(producer, gauge, predicate, Tags.of("broker", node.host()));

registerProducerGauge(producer, baseMetricName, graphiteMetricName, predicate, Tags.of("broker", node.host()));
}


private void registerProducerGauge(final Producer<byte[], byte[]> producer,
final MetricName producerMetricName,
final String gauge) {
Predicate<Map.Entry<MetricName, ? extends Metric>> predicate = entry -> entry.getKey().group().equals(producerMetricName.group()) && entry.getKey().name().equals(producerMetricName.name());
registerProducerGaugeGraphite(producer, gauge, predicate);
registerProducerGaugePrometheus(producer, gauge, predicate, Tags.empty());
Predicate<Map.Entry<MetricName, ? extends Metric>> predicate = entry -> entry.getKey().group().equals(producerMetricName.group())
&& entry.getKey().name().equals(producerMetricName.name());
registerProducerGauge(producer, gauge, gauge, predicate, Tags.empty());
}

private void registerProducerGauge(Producer<byte[], byte[]> producer,
String prometheusMetricName,
String graphiteMetricName,
Predicate<Map.Entry<MetricName, ? extends Metric>> predicate,
Tags tags) {
registerProducerGaugePrometheus(producer, prometheusMetricName, predicate, tags);
registerProducerGaugeGraphite(producer, graphiteMetricName, predicate);
}

private double findProducerMetricByName(Producer<byte[], byte[]> producer, Predicate<Map.Entry<MetricName, ? extends Metric>> predicate) {
private double findProducerMetric(Producer<byte[], byte[]> producer,
Predicate<Map.Entry<MetricName, ? extends Metric>> predicate) {
Optional<? extends Map.Entry<MetricName, ? extends Metric>> first =
producer.metrics().entrySet().stream().filter(predicate).findFirst();
double value = first.map(metricNameEntry -> metricNameEntry.getValue().value()).orElse(0.0);
return value < 0 ? 0.0 : value;
}

private void registerProducerGaugePrometheus(Producer<byte[], byte[]> producer, String gauge, Predicate<Map.Entry<MetricName, ? extends Metric>> predicate, Tags tags) {
Gauge.builder(gauge, producer, p -> findProducerMetricByName(p, predicate))
private void registerProducerGaugePrometheus(Producer<byte[], byte[]> producer,
String gauge,
Predicate<Map.Entry<MetricName, ? extends Metric>> predicate,
Tags tags) {
Gauge.builder(gauge, producer, p -> findProducerMetric(p, predicate))
.tags(tags)
.register(meterRegistry);
}

private void registerProducerGaugeGraphite(Producer<byte[], byte[]> producer, String gauge, Predicate<Map.Entry<MetricName, ? extends Metric>> predicate) {
hermesMetrics.registerGauge(gauge, () -> findProducerMetricByName(producer, predicate));
private void registerProducerGaugeGraphite(Producer<byte[], byte[]> producer,
String gauge,
Predicate<Map.Entry<MetricName, ? extends Metric>> predicate) {
hermesMetrics.registerGauge(gauge, () -> findProducerMetric(producer, predicate));
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import pl.allegro.tech.hermes.metrics.HermesHistogram;
import pl.allegro.tech.hermes.metrics.HermesTimer;


import static pl.allegro.tech.hermes.common.metric.Meters.DELAYED_PROCESSING;
import static pl.allegro.tech.hermes.common.metric.Meters.METER;
import static pl.allegro.tech.hermes.common.metric.Meters.THROUGHPUT_BYTES;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ private void awaitBufferFlush() throws InterruptedException {
}

private boolean isBufferEmpty() {
long bufferUsedBytes = (long) (metrics.producerMetrics().getBufferTotalBytes() - metrics.producerMetrics().getBufferAvailableBytes());
long bufferUsedBytes = (long) (metrics.producerMetrics().getBufferTotalBytes()
- metrics.producerMetrics().getBufferAvailableBytes());
logger.info("Buffer flush: {} bytes still in use", bufferUsedBytes);
return bufferUsedBytes < TOLERANCE_BYTES;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
package pl.allegro.tech.hermes.metrics;


import io.micrometer.core.instrument.distribution.Histogram;

public class HermesHistogram {
private final io.micrometer.core.instrument.DistributionSummary micrometerHistogram;
private final com.codahale.metrics.Histogram graphiteHistogram;

private HermesHistogram(io.micrometer.core.instrument.DistributionSummary micrometerHistogram, com.codahale.metrics.Histogram graphiteHistogram) {
private HermesHistogram(io.micrometer.core.instrument.DistributionSummary micrometerHistogram,
com.codahale.metrics.Histogram graphiteHistogram) {
this.micrometerHistogram = micrometerHistogram;
this.graphiteHistogram = graphiteHistogram;
}
Expand Down

0 comments on commit 5555c7f

Please sign in to comment.