Skip to content

Commit

Permalink
UDS datagram support in StatsD (#2722)
Browse files Browse the repository at this point in the history
Utilizes the support in Netty and Reactor Netty for Unix domain socket datagram protocol via the `UdpClient`. The StatsdConfig `host` should be the path to the socket when the `protocol` is configured to `UDS_DATAGRAM`.

Resolves gh-792
  • Loading branch information
shakuzen committed Jul 29, 2021
1 parent 944032a commit 24b19c7
Show file tree
Hide file tree
Showing 7 changed files with 80 additions and 25 deletions.
11 changes: 11 additions & 0 deletions implementations/micrometer-registry-statsd/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,27 @@ plugins {
id 'com.github.johnrengelman.shadow' version '7.0.0'
}

repositories {
// TODO remove when reactor-netty 1.0.10 is released
maven { url 'https://repo.spring.io/snapshot' }
}

dependencies {
api project(':micrometer-core')

implementation 'io.projectreactor:reactor-core'
implementation 'io.projectreactor.netty:reactor-netty-core'
constraints {
// TODO remove when reactor-netty 1.0.10 is released
implementation 'io.projectreactor.netty:reactor-netty-core:1.0.10-SNAPSHOT'
}

testImplementation project(':micrometer-test')
testImplementation 'io.projectreactor:reactor-test'
testImplementation 'ch.qos.logback:logback-classic'
testImplementation 'org.awaitility:awaitility'
// for running tests with UDS on OSX
testImplementation 'io.netty:netty-transport-native-kqueue:4.1.66.Final:osx-x86_64'
}

shadowJar {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ default boolean enabled() {
}

/**
* @return The host name of the StatsD agent.
* @return Host (or socket in case of Unix domain socket protocol) to receive StatsD metrics.
*/
default String host() {
return getString(this, "host").orElse("localhost");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.micrometer.core.lang.Nullable;
import io.micrometer.core.util.internal.logging.WarnThenDebugLogger;
import io.micrometer.statsd.internal.*;
import io.netty.channel.unix.DomainSocketAddress;
import io.netty.util.AttributeKey;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
Expand All @@ -40,18 +41,17 @@
import reactor.netty.udp.UdpClient;
import reactor.util.retry.Retry;

import java.net.InetSocketAddress;
import java.net.PortUnreachableException;
import java.net.SocketAddress;
import java.time.Duration;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.ToDoubleFunction;
import java.util.function.ToLongFunction;
import java.util.function.*;
import java.util.stream.DoubleStream;

/**
Expand Down Expand Up @@ -219,19 +219,20 @@ public void onComplete() {
publisher = this.sink.asFlux();
}
if (statsdConfig.protocol() == StatsdProtocol.UDP) {
prepareUdpClient(publisher);
prepareUdpClient(publisher, () -> InetSocketAddress.createUnresolved(statsdConfig.host(), statsdConfig.port()));
} else if (statsdConfig.protocol() == StatsdProtocol.UDS_DATAGRAM) {
prepareUdpClient(publisher, () -> new DomainSocketAddress(statsdConfig.host()));
} else if (statsdConfig.protocol() == StatsdProtocol.TCP) {
prepareTcpClient(publisher);
}
}
}
}

private void prepareUdpClient(Publisher<String> publisher) {
private void prepareUdpClient(Publisher<String> publisher, Supplier<SocketAddress> remoteAddress) {
AtomicReference<UdpClient> udpClientReference = new AtomicReference<>();
UdpClient udpClient = UdpClient.create()
.host(statsdConfig.host())
.port(statsdConfig.port())
.remoteAddress(remoteAddress)
.handle((in, out) -> out
.sendString(publisher)
.neverComplete()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,7 @@
*/
public enum StatsdProtocol {
UDP,
/** Unix domain socket datagram */
UDS_DATAGRAM,
TCP
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ void invalid() {
assertThat(config.validate().failures().stream().map(Validated.Invalid::getMessage))
.containsOnly(
"should be one of 'ETSY', 'DATADOG', 'TELEGRAF', 'SYSDIG'",
"should be one of 'UDP', 'TCP'",
"should be one of 'UDP', 'UDS_DATAGRAM', 'TCP'",
"must contain a valid time unit"
)
.hasSize(4);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.channel.unix.DomainSocketAddress;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.OS;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import reactor.core.Disposable;
Expand All @@ -34,6 +36,7 @@
import reactor.netty.tcp.TcpServer;
import reactor.netty.udp.UdpServer;

import java.io.File;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.concurrent.CountDownLatch;
Expand All @@ -44,6 +47,7 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assumptions.assumeTrue;

/**
* Tests {@link StatsdMeterRegistry} metrics publishing functionality.
Expand All @@ -52,6 +56,7 @@
* @author Johnny Lim
*/
class StatsdMeterRegistryPublishTest {
public static final String UDS_DATAGRAM_SOCKET_PATH = "/tmp/test-server.sock";

StatsdMeterRegistry meterRegistry;
DisposableChannel server;
Expand All @@ -62,7 +67,9 @@ class StatsdMeterRegistryPublishTest {

@AfterEach
void cleanUp() {
meterRegistry.close();
if (meterRegistry != null) {
meterRegistry.close();
}
if (server != null) {
server.disposeNow();
}
Expand All @@ -71,10 +78,11 @@ void cleanUp() {
@ParameterizedTest
@EnumSource(StatsdProtocol.class)
void receiveMetricsSuccessfully(StatsdProtocol protocol) throws InterruptedException {
skipUdsTestOnWindows(protocol);
serverLatch = new CountDownLatch(3);
server = startServer(protocol, 0);

final int port = getPort();
final int port = getPort(protocol);

meterRegistry = new StatsdMeterRegistry(getUnbufferedConfig(protocol, port), Clock.SYSTEM);
startRegistryAndWaitForClient();
Expand All @@ -88,11 +96,12 @@ void receiveMetricsSuccessfully(StatsdProtocol protocol) throws InterruptedExcep
@ParameterizedTest
@EnumSource(StatsdProtocol.class)
void resumeSendingMetrics_whenServerIntermittentlyFails(StatsdProtocol protocol) throws InterruptedException {
skipUdsTestOnWindows(protocol);
serverLatch = new CountDownLatch(1);
AtomicInteger writeCount = new AtomicInteger();
server = startServer(protocol, 0);

final int port = getPort();
final int port = getPort(protocol);

meterRegistry = new StatsdMeterRegistry(getUnbufferedConfig(protocol, port), Clock.SYSTEM);
startRegistryAndWaitForClient();
Expand Down Expand Up @@ -134,10 +143,11 @@ void resumeSendingMetrics_whenServerIntermittentlyFails(StatsdProtocol protocol)
@EnumSource(StatsdProtocol.class)
@Issue("#1676")
void stopAndStartMeterRegistrySendsMetrics(StatsdProtocol protocol) throws InterruptedException {
skipUdsTestOnWindows(protocol);
serverLatch = new CountDownLatch(3);
server = startServer(protocol, 0);

final int port = getPort();
final int port = getPort(protocol);

meterRegistry = new StatsdMeterRegistry(getUnbufferedConfig(protocol, port), Clock.SYSTEM);
startRegistryAndWaitForClient();
Expand Down Expand Up @@ -175,11 +185,12 @@ void stopAndStartMeterRegistryWithLineSink() throws InterruptedException {
@ParameterizedTest
@EnumSource(StatsdProtocol.class)
void whenBackendInitiallyDown_metricsSentAfterBackendStarts(StatsdProtocol protocol) throws InterruptedException {
skipUdsTestOnWindows(protocol);
AtomicInteger writeCount = new AtomicInteger();
serverLatch = new CountDownLatch(3);
// start server to secure an open port
server = startServer(protocol, 0);
final int port = getPort();
final int port = getPort(protocol);
server.disposeNow();
meterRegistry = new StatsdMeterRegistry(getUnbufferedConfig(protocol, port), Clock.SYSTEM);
meterRegistry.start();
Expand All @@ -190,10 +201,10 @@ void whenBackendInitiallyDown_metricsSentAfterBackendStarts(StatsdProtocol proto
await().until(() -> writeCount.get() == 3);
}
server = startServer(protocol, port);
if (protocol == StatsdProtocol.TCP) {
// client is null until TcpClient first connects
if (protocol == StatsdProtocol.TCP || protocol == StatsdProtocol.UDS_DATAGRAM) {
// client is null until connection established
await().until(() -> meterRegistry.statsdConnection.get() != null);
// TcpClient may take some time to reconnect to the server
// client may take some time to reconnect to the server
await().until(() -> !clientIsDisposed());
}
assertThat(serverLatch.getCount()).isEqualTo(3);
Expand All @@ -213,10 +224,11 @@ void whenBackendInitiallyDown_metricsSentAfterBackendStarts(StatsdProtocol proto
@ParameterizedTest
@EnumSource(StatsdProtocol.class)
void whenRegistryStopped_doNotConnectToBackend(StatsdProtocol protocol) throws InterruptedException {
skipUdsTestOnWindows(protocol);
serverLatch = new CountDownLatch(3);
// start server to secure an open port
server = startServer(protocol, 0);
final int port = getPort();
final int port = getPort(protocol);
meterRegistry = new StatsdMeterRegistry(getUnbufferedConfig(protocol, port), Clock.SYSTEM);
startRegistryAndWaitForClient();
server.disposeNow();
Expand All @@ -232,9 +244,10 @@ void whenRegistryStopped_doNotConnectToBackend(StatsdProtocol protocol) throws I
@EnumSource(StatsdProtocol.class)
@Issue("#2177")
void whenSendError_reconnectsAndWritesNewMetrics(StatsdProtocol protocol) throws InterruptedException {
skipUdsTestOnWindows(protocol);
serverLatch = new CountDownLatch(3);
server = startServer(protocol, 0);
final int port = getPort();
final int port = getPort(protocol);
meterRegistry = new StatsdMeterRegistry(getUnbufferedConfig(protocol, port), Clock.SYSTEM);
startRegistryAndWaitForClient();
((Connection) meterRegistry.statsdConnection.get()).addHandler("writeFailure", new ChannelOutboundHandlerAdapter() {
Expand All @@ -255,15 +268,21 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
await().pollDelay(Duration.ofSeconds(1)).atMost(Duration.ofSeconds(3)).until(() -> serverMetricReadCount.get() == 3);
}

private int getPort() {
private void skipUdsTestOnWindows(StatsdProtocol protocol) {
if (protocol == StatsdProtocol.UDS_DATAGRAM)
assumeTrue(!OS.WINDOWS.isCurrentOs());
}

private int getPort(StatsdProtocol protocol) {
if (protocol == StatsdProtocol.UDS_DATAGRAM) return 0;
return ((InetSocketAddress) server.address()).getPort();
}

private void trackWritesForUdpClient(StatsdProtocol protocol, AtomicInteger writeCount) {
if (protocol == StatsdProtocol.UDP) {
await().until(() -> meterRegistry.statsdConnection.get() != null);
((Connection) meterRegistry.statsdConnection.get())
.addHandler(new LoggingHandler("udpclient", LogLevel.INFO))
.addHandler(new LoggingHandler("testudpclient", LogLevel.INFO))
.addHandler(new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
Expand All @@ -284,10 +303,10 @@ private boolean clientIsDisposed() {
}

private DisposableChannel startServer(StatsdProtocol protocol, int port) {
if (protocol == StatsdProtocol.UDP) {
if (protocol == StatsdProtocol.UDP || protocol == StatsdProtocol.UDS_DATAGRAM) {
return UdpServer.create()
.host("localhost")
.port(port)
.bindAddress(() -> protocol == StatsdProtocol.UDP ? InetSocketAddress.createUnresolved("localhost", port)
: newDomainSocketAddress())
.handle((in, out) ->
in.receive().asString()
.flatMap(packet -> {
Expand Down Expand Up @@ -328,13 +347,30 @@ private DisposableChannel startServer(StatsdProtocol protocol, int port) {
}
}

private static DomainSocketAddress newDomainSocketAddress() {
try {
File tempFile = new File(UDS_DATAGRAM_SOCKET_PATH);
tempFile.delete();
tempFile.deleteOnExit();
return new DomainSocketAddress(tempFile);
}
catch (Exception e) {
throw new RuntimeException("Error creating a temporary file", e);
}
}

private StatsdConfig getUnbufferedConfig(StatsdProtocol protocol, int port) {
return new StatsdConfig() {
@Override
public String get(String key) {
return null;
}

@Override
public String host() {
return protocol == StatsdProtocol.UDS_DATAGRAM ? UDS_DATAGRAM_SOCKET_PATH : "localhost";
}

@Override
public int port() {
return port;
Expand Down
5 changes: 5 additions & 0 deletions samples/micrometer-samples-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@ plugins {
id 'java'
}

repositories {
// TODO remove when reactor-netty 1.0.10 is released
maven { url 'https://repo.spring.io/snapshot' }
}

dependencies {
implementation platform('io.projectreactor:reactor-bom:2020.0.+')

Expand Down

0 comments on commit 24b19c7

Please sign in to comment.