Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate from gRPC to armeria for testing agent in memory exporter #5314

Closed
wants to merge 14 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class JedisClientTest extends AgentInstrumentationSpecification {

def setup() {
jedis.flushAll()
testRunner().forceFlush()
clearExportedData()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ public void afterTestClass() {
+ TestAgentListenerAccess.getIgnoredButTransformedClassNames();
}

@Override
public void forceFlush() {
AgentTestingExporterAccess.forceFlush();
}

@Override
public void clearAllExportedData() {
AgentTestingExporterAccess.reset();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ protected InstrumentationTestRunner(OpenTelemetry openTelemetry) {

public abstract void afterTestClass();

public abstract void forceFlush();

public abstract void clearAllExportedData();

public abstract OpenTelemetry getOpenTelemetry();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;

/**
* An implementation of {@link InstrumentationTestRunner} that initializes OpenTelemetry SDK and
Expand Down Expand Up @@ -87,6 +89,16 @@ public void beforeTestClass() {
@Override
public void afterTestClass() {}

@Override
public void forceFlush() {
List<CompletableResultCode> results =
Arrays.asList(
openTelemetry.getSdkTracerProvider().forceFlush(),
openTelemetry.getSdkMeterProvider().forceFlush(),
openTelemetry.getSdkLogEmitterProvider().forceFlush());
CompletableResultCode.ofAll(results).join(10, TimeUnit.SECONDS);
}

@Override
public void clearAllExportedData() {
testSpanExporter.reset();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ public final class AgentTestingExporterAccess {
private static final MethodHandle getSpanExportRequests;
private static final MethodHandle getMetricExportRequests;
private static final MethodHandle getLogExportRequests;
private static final MethodHandle forceFlush;
private static final MethodHandle reset;
private static final MethodHandle forceFlushCalled;

Expand All @@ -108,6 +109,9 @@ public final class AgentTestingExporterAccess {
agentTestingExporterFactoryClass,
"getLogExportRequests",
MethodType.methodType(List.class));
forceFlush =
lookup.findStatic(
agentTestingExporterFactoryClass, "forceFlush", MethodType.methodType(void.class));
reset =
lookup.findStatic(
agentTestingExporterFactoryClass, "reset", MethodType.methodType(void.class));
Expand All @@ -121,6 +125,14 @@ public final class AgentTestingExporterAccess {
}
}

public static void forceFlush() {
try {
forceFlush.invokeExact();
} catch (Throwable t) {
throw new AssertionError("Could not invoke forceFlush", t);
}
}

public static void reset() {
try {
reset.invokeExact();
Expand Down
4 changes: 1 addition & 3 deletions testing/agent-exporter/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@ dependencies {
compileOnly(project(":instrumentation-appender-api-internal"))
compileOnly(project(":instrumentation-appender-sdk-internal"))

implementation("io.grpc:grpc-core:1.33.1")
implementation("io.grpc:grpc-protobuf:1.33.1")
implementation("io.grpc:grpc-stub:1.33.1")
implementation("com.linecorp.armeria:armeria-grpc-protocol:1.14.0")
implementation("io.opentelemetry:opentelemetry-exporter-otlp")
implementation("io.opentelemetry:opentelemetry-exporter-otlp-metrics")
implementation("io.opentelemetry:opentelemetry-exporter-otlp-logs")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@

package io.opentelemetry.javaagent.testing.exporter;

import io.opentelemetry.sdk.common.CompletableResultCode;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;

public class AgentTestingExporterFactory {

Expand All @@ -25,6 +28,15 @@ public static List<byte[]> getLogExportRequests() {
return logExporter.getCollectedExportRequests();
}

public static void forceFlush() {
// TODO(anuraaga): Flush metrics too.
List<CompletableResultCode> results =
Arrays.asList(
AgentTestingTracingCustomizer.spanProcessor.forceFlush(),
AgentTestingLogsCustomizer.logProcessor.forceFlush());
CompletableResultCode.ofAll(results).join(10, TimeUnit.SECONDS);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WDYT about using OpenTelemetrySdkAccess instead? It flushes the meter provider too

Copy link
Contributor Author

@anuraaga anuraaga Feb 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops realized that we don't want to flush metrics here anyways (there's no such thing as pending metric exports really, all exports happen at random times and are valid)

}

public static void reset() {
spanExporter.reset();
metricExporter.reset();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,24 @@
import io.opentelemetry.javaagent.extension.AgentListener;
import io.opentelemetry.javaagent.instrumentation.api.appender.internal.AgentLogEmitterProvider;
import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk;
import io.opentelemetry.sdk.logs.LogProcessor;
import io.opentelemetry.sdk.logs.SdkLogEmitterProvider;
import io.opentelemetry.sdk.logs.export.BatchLogProcessor;
import io.opentelemetry.sdk.logs.export.SimpleLogProcessor;

@AutoService(AgentListener.class)
public class AgentTestingLogsCustomizer implements AgentListener {

static final LogProcessor logProcessor =
anuraaga marked this conversation as resolved.
Show resolved Hide resolved
SimpleLogProcessor.create(AgentTestingExporterFactory.logExporter);

@Override
public void beforeAgent(
Config config, AutoConfiguredOpenTelemetrySdk autoConfiguredOpenTelemetrySdk) {

SdkLogEmitterProvider logEmitterProvider =
SdkLogEmitterProvider.builder()
.setResource(autoConfiguredOpenTelemetrySdk.getResource())
.addLogProcessor(
BatchLogProcessor.builder(AgentTestingExporterFactory.logExporter).build())
.addLogProcessor(logProcessor)
.build();

AgentLogEmitterProvider.resetForTest();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

@AutoService(SdkMeterProviderConfigurer.class)
public class AgentTestingMetricsCustomizer implements SdkMeterProviderConfigurer {

@Override
public void configure(
SdkMeterProviderBuilder sdkMeterProviderBuilder, ConfigProperties configProperties) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,23 @@

package io.opentelemetry.javaagent.testing.exporter;

import io.grpc.Server;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
import io.grpc.stub.StreamObserver;
import static java.util.concurrent.CompletableFuture.completedFuture;

import com.google.protobuf.InvalidProtocolBufferException;
import com.linecorp.armeria.common.grpc.protocol.ArmeriaStatusException;
import com.linecorp.armeria.server.Server;
import com.linecorp.armeria.server.ServiceRequestContext;
import com.linecorp.armeria.server.grpc.protocol.AbstractUnaryGrpcService;
import io.opentelemetry.exporter.otlp.logs.OtlpGrpcLogExporter;
import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest;
import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceResponse;
import io.opentelemetry.proto.collector.logs.v1.LogsServiceGrpc;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.logs.data.LogData;
import io.opentelemetry.sdk.logs.export.LogExporter;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -48,22 +50,17 @@ void reset() {
private final LogExporter delegate;

OtlpInMemoryLogExporter() {
String serverName = InProcessServerBuilder.generateName();

collector =
InProcessServerBuilder.forName(serverName)
.directExecutor()
.addService(new InMemoryOtlpCollector())
Server.builder()
.service(
"/opentelemetry.proto.collector.logs.v1.LogsService/Export",
new InMemoryOtlpCollector())
.build();
try {
collector.start();
} catch (IOException e) {
throw new AssertionError("Could not start in-process collector.", e);
}
collector.start().join();

delegate =
OtlpGrpcLogExporter.builder()
.setChannel(InProcessChannelBuilder.forName(serverName).directExecutor().build())
.setEndpoint("http://localhost:" + collector.activeLocalPort())
.build();
}

Expand All @@ -82,19 +79,22 @@ public CompletableResultCode flush() {

@Override
public CompletableResultCode shutdown() {
collector.shutdown();
collector.stop();
return delegate.shutdown();
}

private class InMemoryOtlpCollector extends LogsServiceGrpc.LogsServiceImplBase {
private final class InMemoryOtlpCollector extends AbstractUnaryGrpcService {

private final byte[] response = ExportLogsServiceResponse.getDefaultInstance().toByteArray();

@Override
public void export(
ExportLogsServiceRequest request,
StreamObserver<ExportLogsServiceResponse> responseObserver) {
collectedRequests.add(request);
responseObserver.onNext(ExportLogsServiceResponse.getDefaultInstance());
responseObserver.onCompleted();
protected CompletionStage<byte[]> handleMessage(ServiceRequestContext ctx, byte[] message) {
try {
collectedRequests.add(ExportLogsServiceRequest.parseFrom(message));
} catch (InvalidProtocolBufferException e) {
throw new ArmeriaStatusException(3, e.getMessage(), e);
}
return completedFuture(response);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,23 @@

package io.opentelemetry.javaagent.testing.exporter;

import io.grpc.Server;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
import io.grpc.stub.StreamObserver;
import static java.util.concurrent.CompletableFuture.completedFuture;

import com.google.protobuf.InvalidProtocolBufferException;
import com.linecorp.armeria.common.grpc.protocol.ArmeriaStatusException;
import com.linecorp.armeria.server.Server;
import com.linecorp.armeria.server.ServiceRequestContext;
import com.linecorp.armeria.server.grpc.protocol.AbstractUnaryGrpcService;
import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporter;
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest;
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceResponse;
import io.opentelemetry.proto.collector.metrics.v1.MetricsServiceGrpc;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.export.MetricExporter;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -48,22 +50,17 @@ void reset() {
private final MetricExporter delegate;

OtlpInMemoryMetricExporter() {
String serverName = InProcessServerBuilder.generateName();

collector =
InProcessServerBuilder.forName(serverName)
.directExecutor()
.addService(new InMemoryOtlpCollector())
Server.builder()
.service(
"/opentelemetry.proto.collector.metrics.v1.MetricsService/Export",
new InMemoryOtlpCollector())
.build();
try {
collector.start();
} catch (IOException e) {
throw new AssertionError("Could not start in-process collector.", e);
}
collector.start().join();

delegate =
OtlpGrpcMetricExporter.builder()
.setChannel(InProcessChannelBuilder.forName(serverName).directExecutor().build())
.setEndpoint("http://localhost:" + collector.activeLocalPort())
.build();
}

Expand All @@ -79,19 +76,22 @@ public CompletableResultCode flush() {

@Override
public CompletableResultCode shutdown() {
collector.shutdown();
collector.stop();
return delegate.shutdown();
}

private class InMemoryOtlpCollector extends MetricsServiceGrpc.MetricsServiceImplBase {
private final class InMemoryOtlpCollector extends AbstractUnaryGrpcService {

private final byte[] response = ExportMetricsServiceResponse.getDefaultInstance().toByteArray();

@Override
public void export(
ExportMetricsServiceRequest request,
StreamObserver<ExportMetricsServiceResponse> responseObserver) {
collectedRequests.add(request);
responseObserver.onNext(ExportMetricsServiceResponse.getDefaultInstance());
responseObserver.onCompleted();
protected CompletionStage<byte[]> handleMessage(ServiceRequestContext ctx, byte[] message) {
try {
collectedRequests.add(ExportMetricsServiceRequest.parseFrom(message));
} catch (InvalidProtocolBufferException e) {
throw new ArmeriaStatusException(3, e.getMessage(), e);
}
return completedFuture(response);
}
}
}
Loading