Skip to content

Commit

Permalink
#1056 Add support for datadog distributions
Browse files Browse the repository at this point in the history
  • Loading branch information
cyberdelia authored and jkschneider committed Aug 28, 2021
1 parent d7bf555 commit 589899f
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -285,14 +285,14 @@ private void connectAndSubscribe(UdpClient udpClient) {
}

private void retryReplaceClient(Mono<? extends Connection> connectMono) {
connectMono
.retryWhen(Retry.backoff(Long.MAX_VALUE, Duration.ofSeconds(1)).maxBackoff(Duration.ofMinutes(1)))
.subscribe(connection -> {
this.statsdConnection.replace(connection);
connectMono
.retryWhen(Retry.backoff(Long.MAX_VALUE, Duration.ofSeconds(1)).maxBackoff(Duration.ofMinutes(1)))
.subscribe(connection -> {
this.statsdConnection.replace(connection);

// now that we're connected, start polling gauges and other pollable meter types
startPolling();
});
// now that we're connected, start polling gauges and other pollable meter types
startPolling();
});
}

private void startPolling() {
Expand Down Expand Up @@ -327,11 +327,19 @@ protected <T> Gauge newGauge(Meter.Id id, @Nullable T obj, ToDoubleFunction<T> v
}

private StatsdLineBuilder lineBuilder(Meter.Id id) {
return lineBuilder(id, null);
}

private StatsdLineBuilder lineBuilder(Meter.Id id, @Nullable DistributionStatisticConfig distributionStatisticConfig) {
if (lineBuilderFunction == null) {
lineBuilderFunction = id2 -> {
switch (statsdConfig.flavor()) {
case DATADOG:
return new DatadogStatsdLineBuilder(id2, config());
if (distributionStatisticConfig != null) {
return new DatadogStatsdLineBuilder(id2, config(), distributionStatisticConfig);
} else {
return new DatadogStatsdLineBuilder(id2, config());
}
case TELEGRAF:
return new TelegrafStatsdLineBuilder(id2, config());
case SYSDIG:
Expand Down Expand Up @@ -361,7 +369,7 @@ protected Counter newCounter(Meter.Id id) {

@Override
protected LongTaskTimer newLongTaskTimer(Meter.Id id, DistributionStatisticConfig distributionStatisticConfig) {
StatsdLongTaskTimer ltt = new StatsdLongTaskTimer(id, lineBuilder(id), this.sink, clock, statsdConfig.publishUnchangedMeters(),
StatsdLongTaskTimer ltt = new StatsdLongTaskTimer(id, lineBuilder(id, distributionStatisticConfig), this.sink, clock, statsdConfig.publishUnchangedMeters(),
distributionStatisticConfig, getBaseTimeUnit());
HistogramGauges.registerWithCommonFormat(ltt, this);
pollableMeters.put(id, ltt);
Expand All @@ -377,7 +385,7 @@ protected Timer newTimer(Meter.Id id, DistributionStatisticConfig distributionSt
distributionStatisticConfig = addInfBucket(distributionStatisticConfig);
}

Timer timer = new StatsdTimer(id, lineBuilder(id), this.sink, clock, distributionStatisticConfig, pauseDetector, getBaseTimeUnit(),
Timer timer = new StatsdTimer(id, lineBuilder(id, distributionStatisticConfig), this.sink, clock, distributionStatisticConfig, pauseDetector, getBaseTimeUnit(),
statsdConfig.step().toMillis());
HistogramGauges.registerWithCommonFormat(timer, this);
return timer;
Expand All @@ -392,7 +400,7 @@ protected DistributionSummary newDistributionSummary(Meter.Id id, DistributionSt
distributionStatisticConfig = addInfBucket(distributionStatisticConfig);
}

DistributionSummary summary = new StatsdDistributionSummary(id, lineBuilder(id), this.sink, clock, distributionStatisticConfig, scale);
DistributionSummary summary = new StatsdDistributionSummary(id, lineBuilder(id, distributionStatisticConfig), this.sink, clock, distributionStatisticConfig, scale);
HistogramGauges.registerWithCommonFormat(summary, this);
return summary;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,16 @@
import io.micrometer.core.instrument.Statistic;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.config.NamingConvention;
import io.micrometer.core.instrument.distribution.DistributionStatisticConfig;
import io.micrometer.core.instrument.util.DoubleFormat;
import io.micrometer.core.lang.Nullable;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;

public class DatadogStatsdLineBuilder extends FlavorStatsdLineBuilder {

private static final String TYPE_DISTRIBUTION = "d";
private static final String ENTITY_ID_TAG_NAME = "dd.internal.entity_id";

private final Object conventionTagsLock = new Object();
Expand All @@ -40,16 +42,34 @@ public class DatadogStatsdLineBuilder extends FlavorStatsdLineBuilder {
@SuppressWarnings("NullableProblems")
private volatile String tagsNoStat;
private final ConcurrentMap<Statistic, String> tags = new ConcurrentHashMap<>();
private final boolean percentileHistogram;
// VisibleForTesting
@Nullable
String ddEntityId;

public DatadogStatsdLineBuilder(Meter.Id id, MeterRegistry.Config config, DistributionStatisticConfig distributionStatisticConfig) {
super(id, config);

percentileHistogram = distributionStatisticConfig.isPercentileHistogram();
ddEntityId = System.getenv("DD_ENTITY_ID");
}

public DatadogStatsdLineBuilder(Meter.Id id, MeterRegistry.Config config) {
super(id, config);

percentileHistogram = false;
ddEntityId = System.getenv("DD_ENTITY_ID");
}

@Override
public String histogram(double amount) {
if (percentileHistogram) {
return line(DoubleFormat.decimalOrNan(amount), null, TYPE_DISTRIBUTION);
} else {
return super.histogram(amount);
}
}

@Override
String line(String amount, @Nullable Statistic stat, String type) {
updateIfNamingConventionChanged();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,20 @@ void infBucketEqualsCount() {
assertThat(count).isEqualTo(1);
}

@Test
void supportsDatadogDistributions() {
final Sinks.Many<String> lines = sink();
registry = StatsdMeterRegistry.builder(configWithFlavor(StatsdFlavor.DATADOG))
.clock(clock)
.lineSink(toLineSink(lines))
.build();

StepVerifier.create(lines.asFlux())
.then(() -> DistributionSummary.builder("my.summary").publishPercentileHistogram(true).register(registry).record(1))
.expectNext("my.summary:1|d")
.verifyComplete();
}

@Test
void interactWithStoppedRegistry() {
registry = new StatsdMeterRegistry(configWithFlavor(StatsdFlavor.ETSY), clock);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

import io.micrometer.core.Issue;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Statistic;
import io.micrometer.core.instrument.config.NamingConvention;
import io.micrometer.core.instrument.distribution.DistributionStatisticConfig;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import org.junit.jupiter.api.Test;

Expand All @@ -40,6 +42,26 @@ void changingNamingConvention() {
assertThat(lb.line("1", Statistic.COUNT, "c")).isEqualTo("myCounter:1|c|#statistic:count,myTag:value");
}

@Test
void useDistributions() {
DistributionSummary s = registry.summary("my.summary", "tag", "value");
DatadogStatsdLineBuilder lb = new DatadogStatsdLineBuilder(s.getId(), registry.config(), DistributionStatisticConfig.builder()
.percentilesHistogram(true)
.build());

assertThat(lb.histogram(1.0)).isEqualTo("my_summary:1|d|#tag:value");
}

@Test
void useHistograms() {
DistributionSummary s = registry.summary("my.summary", "tag", "value");
DatadogStatsdLineBuilder lb = new DatadogStatsdLineBuilder(s.getId(), registry.config(), DistributionStatisticConfig.builder()
.percentilesHistogram(false)
.build());

assertThat(lb.histogram(1.0)).isEqualTo("my_summary:1|h|#tag:value");
}

@Issue("#739")
@Test
void sanitizeColonsInTagKeys() {
Expand Down

0 comments on commit 589899f

Please sign in to comment.