Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate from gRPC to armeria for testing agent in memory exporter #5314

Closed
wants to merge 14 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,12 @@ class JavaagentTestArgumentsProvider(
"-Dio.opentelemetry.javaagent.slf4j.simpleLogger.log.io.grpc.internal.ServerImplBuilder=INFO",
"-Dio.opentelemetry.javaagent.slf4j.simpleLogger.log.io.grpc.internal.ManagedChannelImplBuilder=INFO",
"-Dio.opentelemetry.javaagent.slf4j.simpleLogger.log.io.perfmark.PerfMark=INFO",
"-Dio.opentelemetry.javaagent.slf4j.simpleLogger.log.io.grpc.Context=INFO"
"-Dio.opentelemetry.javaagent.slf4j.simpleLogger.log.io.grpc.Context=INFO",

// suppress test infrastructure logs
"-Dio.opentelemetry.javaagent.slf4j.simpleLogger.log.com.linecorp.armeria=OFF",
"-Dio.opentelemetry.javaagent.slf4j.simpleLogger.log.okhttp3=OFF",
"-Dio.opentelemetry.javaagent.slf4j.simpleLogger.log.io.netty=OFF",
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;

/**
* An implementation of {@link InstrumentationTestRunner} that initializes OpenTelemetry SDK and
Expand Down Expand Up @@ -89,6 +91,14 @@ public void afterTestClass() {}

@Override
public void clearAllExportedData() {
// Finish any pending trace or log exports before resetting. There is no such thing as
// "finishing" metrics so we don't flush it here.
List<CompletableResultCode> results =
Arrays.asList(
openTelemetry.getSdkTracerProvider().forceFlush(),
openTelemetry.getSdkLogEmitterProvider().forceFlush());
CompletableResultCode.ofAll(results).join(10, TimeUnit.SECONDS);

testSpanExporter.reset();
testMetricExporter.reset();
forceFlushCalled = false;
Expand All @@ -105,6 +115,7 @@ public OpenTelemetrySdk getOpenTelemetrySdk() {

@Override
public List<SpanData> getExportedSpans() {
openTelemetry.getSdkTracerProvider().forceFlush().join(10, TimeUnit.SECONDS);
return testSpanExporter.getFinishedSpanItems();
}

Expand Down
4 changes: 1 addition & 3 deletions testing/agent-exporter/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@ dependencies {
compileOnly(project(":instrumentation-appender-api-internal"))
compileOnly(project(":instrumentation-appender-sdk-internal"))

implementation("io.grpc:grpc-core:1.33.1")
implementation("io.grpc:grpc-protobuf:1.33.1")
implementation("io.grpc:grpc-stub:1.33.1")
implementation("com.linecorp.armeria:armeria-grpc-protocol:1.14.0")
implementation("io.opentelemetry:opentelemetry-exporter-otlp")
implementation("io.opentelemetry:opentelemetry-exporter-otlp-metrics")
implementation("io.opentelemetry:opentelemetry-exporter-otlp-logs")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,32 @@

package io.opentelemetry.javaagent.testing.exporter;

import static java.util.Objects.requireNonNull;

import com.google.auto.service.AutoService;
import io.opentelemetry.instrumentation.api.config.Config;
import io.opentelemetry.javaagent.extension.AgentListener;
import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk;
import io.opentelemetry.sdk.autoconfigure.spi.AutoConfigurationCustomizer;
import io.opentelemetry.sdk.autoconfigure.spi.AutoConfigurationCustomizerProvider;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.export.MetricProducer;
import io.opentelemetry.sdk.metrics.export.MetricReader;
import io.opentelemetry.sdk.metrics.export.MetricReaderFactory;
import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader;
import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor;
import io.opentelemetry.sdk.trace.export.BatchSpanProcessor;
import java.time.Duration;
import javax.annotation.Nullable;

@AutoService(AutoConfigurationCustomizerProvider.class)
public class AgentTestingCustomizer implements AutoConfigurationCustomizerProvider {
@AutoService({AutoConfigurationCustomizerProvider.class, AgentListener.class})
public class AgentTestingCustomizer implements AutoConfigurationCustomizerProvider, AgentListener {

static final AgentTestingSpanProcessor spanProcessor =
new AgentTestingSpanProcessor(
SimpleSpanProcessor.create(AgentTestingExporterFactory.spanExporter));
BatchSpanProcessor.builder(AgentTestingExporterFactory.spanExporter)
.setScheduleDelay(Duration.ofMillis(200))
.build());

static void reset() {
spanProcessor.forceFlushCalled = false;
Expand All @@ -30,9 +43,73 @@ public void customize(AutoConfigurationCustomizer autoConfigurationCustomizer) {

autoConfigurationCustomizer.addMeterProviderCustomizer(
(meterProvider, config) ->
meterProvider.registerMetricReader(
PeriodicMetricReader.builder(AgentTestingExporterFactory.metricExporter)
.setInterval(Duration.ofMillis(100))
.newMetricReaderFactory()));
meterProvider.registerMetricReader(StartableMetricReader.INSTANCE));
}

@Override
public void afterAgent(
Config config, AutoConfiguredOpenTelemetrySdk autoConfiguredOpenTelemetrySdk) {
StartableMetricReader.INSTANCE.start();
}

@SuppressWarnings("ImmutableEnumChecker")
private enum StartableMetricReader implements MetricReaderFactory, MetricReader {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a brief comment explaining why this is needed (or just pointing to the PR discussion)?

INSTANCE;

@Nullable private volatile MetricProducer metricProducer;

private volatile MetricReader delegate = NoopMetricReader.INSTANCE;

void start() {
MetricProducer metricProducer = this.metricProducer;
requireNonNull(metricProducer);
delegate =
PeriodicMetricReader.builder(AgentTestingExporterFactory.metricExporter)
.setInterval(Duration.ofMillis(300))
.newMetricReaderFactory()
.apply(metricProducer);
}

@Nullable
@Override
public AggregationTemporality getPreferredTemporality() {
return delegate.getPreferredTemporality();
}

@Override
public CompletableResultCode flush() {
return delegate.flush();
}

@Override
public CompletableResultCode shutdown() {
return delegate.shutdown();
}

@Override
public MetricReader apply(MetricProducer metricProducer) {
this.metricProducer = metricProducer;
return this;
}
}

private enum NoopMetricReader implements MetricReader {
INSTANCE;

@Nullable
@Override
public AggregationTemporality getPreferredTemporality() {
return null;
}

@Override
public CompletableResultCode flush() {
return CompletableResultCode.ofSuccess();
}

@Override
public CompletableResultCode shutdown() {
return CompletableResultCode.ofSuccess();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,30 +5,43 @@

package io.opentelemetry.javaagent.testing.exporter;

import io.opentelemetry.sdk.common.CompletableResultCode;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;

public class AgentTestingExporterFactory {

static final OtlpInMemorySpanExporter spanExporter = new OtlpInMemorySpanExporter();
static final OtlpInMemoryMetricExporter metricExporter = new OtlpInMemoryMetricExporter();
static final OtlpInMemoryLogExporter logExporter = new OtlpInMemoryLogExporter();
static final OtlpInMemoryCollector collector = new OtlpInMemoryCollector();

static final OtlpInMemorySpanExporter spanExporter = new OtlpInMemorySpanExporter(collector);
static final OtlpInMemoryMetricExporter metricExporter =
new OtlpInMemoryMetricExporter(collector);
static final OtlpInMemoryLogExporter logExporter = new OtlpInMemoryLogExporter(collector);

public static List<byte[]> getSpanExportRequests() {
return spanExporter.getCollectedExportRequests();
AgentTestingCustomizer.spanProcessor.forceFlush().join(10, TimeUnit.SECONDS);
return collector.getTraceExportRequests();
}

public static List<byte[]> getMetricExportRequests() {
return metricExporter.getCollectedExportRequests();
return collector.getMetricsExportRequests();
}

public static List<byte[]> getLogExportRequests() {
return logExporter.getCollectedExportRequests();
AgentTestingLogsCustomizer.logProcessor.forceFlush().join(10, TimeUnit.SECONDS);
return collector.getLogsExportRequests();
}

public static void reset() {
spanExporter.reset();
metricExporter.reset();
logExporter.reset();
// Finish any pending trace or log exports before resetting. There is no such thing as
// "finishing" metrics so we don't flush it here.
List<CompletableResultCode> results =
Arrays.asList(
AgentTestingLogsCustomizer.logProcessor.forceFlush(),
AgentTestingCustomizer.spanProcessor.forceFlush());
CompletableResultCode.ofAll(results).join(10, TimeUnit.SECONDS);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WDYT about using OpenTelemetrySdkAccess instead? It flushes the meter provider too

Copy link
Contributor Author

@anuraaga anuraaga Feb 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops realized that we don't want to flush metrics here anyways (there's no such thing as pending metric exports really, all exports happen at random times and are valid)

collector.reset();
}

public static boolean forceFlushCalled() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,27 @@
import io.opentelemetry.javaagent.extension.AgentListener;
import io.opentelemetry.javaagent.instrumentation.api.appender.internal.AgentLogEmitterProvider;
import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk;
import io.opentelemetry.sdk.logs.LogProcessor;
import io.opentelemetry.sdk.logs.SdkLogEmitterProvider;
import io.opentelemetry.sdk.logs.export.BatchLogProcessor;
import java.time.Duration;

@AutoService(AgentListener.class)
public class AgentTestingLogsCustomizer implements AgentListener {

static final LogProcessor logProcessor =
anuraaga marked this conversation as resolved.
Show resolved Hide resolved
BatchLogProcessor.builder(AgentTestingExporterFactory.logExporter)
.setScheduleDelay(Duration.ofMillis(200))
.build();

@Override
public void beforeAgent(
Config config, AutoConfiguredOpenTelemetrySdk autoConfiguredOpenTelemetrySdk) {

SdkLogEmitterProvider logEmitterProvider =
SdkLogEmitterProvider.builder()
.setResource(autoConfiguredOpenTelemetrySdk.getResource())
.addLogProcessor(
BatchLogProcessor.builder(AgentTestingExporterFactory.logExporter).build())
.addLogProcessor(logProcessor)
.build();

AgentLogEmitterProvider.resetForTest();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.testing.exporter;

import static java.util.concurrent.CompletableFuture.completedFuture;

import com.linecorp.armeria.server.Server;
import com.linecorp.armeria.server.ServiceRequestContext;
import com.linecorp.armeria.server.grpc.protocol.AbstractUnaryGrpcService;
import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceResponse;
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceResponse;
import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceResponse;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentLinkedQueue;

final class OtlpInMemoryCollector {

private final OtlpService logsService;
private final OtlpService metricsService;
private final OtlpService traceService;

private final Server server;

// To shutdown after all exporters are shutdown;
private int refCnt;

OtlpInMemoryCollector() {
logsService = new OtlpService(ExportLogsServiceResponse.getDefaultInstance().toByteArray());
metricsService =
new OtlpService(ExportMetricsServiceResponse.getDefaultInstance().toByteArray());
traceService = new OtlpService(ExportTraceServiceResponse.getDefaultInstance().toByteArray());
server =
Server.builder()
.service("/opentelemetry.proto.collector.logs.v1.LogsService/Export", logsService)
.service(
"/opentelemetry.proto.collector.metrics.v1.MetricsService/Export", metricsService)
.service("/opentelemetry.proto.collector.trace.v1.TraceService/Export", traceService)
.build();
}

synchronized void start() {
if (refCnt == 0) {
server.start().join();
}
refCnt++;
}

synchronized void stop() {
refCnt--;
if (refCnt == 0) {
server.stop();
}
}

String getEndpoint() {
return "http://localhost:" + server.activeLocalPort();
}

List<byte[]> getLogsExportRequests() {
return logsService.getCollectedRequests();
}

List<byte[]> getMetricsExportRequests() {
return metricsService.getCollectedRequests();
}

List<byte[]> getTraceExportRequests() {
return traceService.getCollectedRequests();
}

void reset() {
logsService.reset();
metricsService.reset();
traceService.reset();
}

private static final class OtlpService extends AbstractUnaryGrpcService {
private final Queue<byte[]> collectedRequests = new ConcurrentLinkedQueue<>();

private final byte[] response;

OtlpService(byte[] response) {
this.response = response;
}

List<byte[]> getCollectedRequests() {
return new ArrayList<>(collectedRequests);
}

void reset() {
collectedRequests.clear();
}

@Override
protected CompletionStage<byte[]> handleMessage(ServiceRequestContext ctx, byte[] message) {
collectedRequests.add(message);
return completedFuture(response);
}
}
}
Loading