Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Onboard ServiceBus onto new tracing #33663

Merged
merged 19 commits into from
Mar 27, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ private static Map<String, String> getMappings() {
// messaging mapping, attributes are defined in com.azure.core.amqp.implementation.ClientConstants and in EventHubs, ServiceBus
// metric helpers
mappings.put("status", "otel.status_code");
mappings.put("entityName", "messaging.destination");
mappings.put("entityName", "messaging.destination.name");
mappings.put("entityPath", "messaging.az.entity_path");
mappings.put("hostName", "net.peer.name");
mappings.put("errorCondition", "amqp.error_condition");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public void attributeMappings() {
assertEquals(13, attributes.size());
assertEquals("value", attributes.get(AttributeKey.stringKey("foobar")));
assertEquals("host", attributes.get(AttributeKey.stringKey("net.peer.name")));
assertEquals("entity", attributes.get(AttributeKey.stringKey("messaging.destination")));
assertEquals("entity", attributes.get(AttributeKey.stringKey("messaging.destination.name")));
assertEquals("path", attributes.get(AttributeKey.stringKey("messaging.az.entity_path")));
assertEquals("amqp::error::code", attributes.get(AttributeKey.stringKey("amqp.error_condition")));
assertEquals("rejected", attributes.get(AttributeKey.stringKey("amqp.delivery_state")));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

Expand All @@ -21,47 +19,50 @@
class OpenTelemetryUtils {
private static final ClientLogger LOGGER = new ClientLogger(OpenTelemetryUtils.class);

private static final Map<String, String> ATTRIBUTE_MAPPING_V1_17_0 = getMappingsV1200();
static final String SERVICE_REQUEST_ID_ATTRIBUTE = "serviceRequestId";
static final String CLIENT_REQUEST_ID_ATTRIBUTE = "requestId";

private static Map<String, String> getMappingsV1200() {
Map<String, String> mappings = new HashMap<>(8);
// messaging mapping, attributes are defined in com.azure.core.amqp.implementation.ClientConstants
mappings.put(ENTITY_PATH_KEY, "messaging.destination.name");
mappings.put(HOST_NAME_KEY, "net.peer.name");
mappings.put(CLIENT_REQUEST_ID_ATTRIBUTE, "az.client_request_id");
mappings.put(SERVICE_REQUEST_ID_ATTRIBUTE, "az.service_request_id");

return Collections.unmodifiableMap(mappings);
}

public static Attributes convert(Map<String, Object> attributeMap, OpenTelemetrySchemaVersion schemaVersion) {
if (attributeMap == null || attributeMap.isEmpty()) {
return Attributes.empty();
}

Map<String, String> mappings = getMappingsForVersion(schemaVersion);

AttributesBuilder builder = Attributes.builder();
for (Map.Entry<String, Object> kvp : attributeMap.entrySet()) {
if (kvp.getValue() == null) {
continue;
}

addAttribute(builder, mappings.getOrDefault(kvp.getKey(), kvp.getKey()), kvp.getValue());
addAttribute(builder, mapAttributeName(kvp.getKey(), schemaVersion), kvp.getValue());
}

return builder.build();
}

private static Map<String, String> getMappingsForVersion(OpenTelemetrySchemaVersion version) {
private static String mapAttributeName(String name, OpenTelemetrySchemaVersion version) {
if (version == OpenTelemetrySchemaVersion.V1_17_0) {
return ATTRIBUTE_MAPPING_V1_17_0;
return mapAttributeNameV1170(name);
}

LOGGER.verbose("Unknown OpenTelemetry Semantic Conventions version: {}, using latest instead: {}", version, OpenTelemetrySchemaVersion.getLatest());
return getMappingsForVersion(OpenTelemetrySchemaVersion.getLatest());
return mapAttributeNameV1170(name);
}

private static String mapAttributeNameV1170(String name) {
if (ENTITY_PATH_KEY.equals(name)) {
return "messaging.destination.name";
}
if (HOST_NAME_KEY.equals(name)) {
return "net.peer.name";
}
if (CLIENT_REQUEST_ID_ATTRIBUTE.equals(name)) {
return "az.client_request_id";
}
if (SERVICE_REQUEST_ID_ATTRIBUTE.equals(name)) {
return "az.service_request_id";
}
return name;
}

/**
Expand Down Expand Up @@ -107,7 +108,7 @@ private static void addAttribute(AttributesBuilder attributesBuilder, String key
static void addAttribute(Span span, String key, Object value, OpenTelemetrySchemaVersion schemaVersion) {
Objects.requireNonNull(key, "OpenTelemetry attribute name cannot be null.");

key = getMappingsForVersion(schemaVersion).getOrDefault(key, key);
key = mapAttributeName(key, schemaVersion);
if (value instanceof String) {
span.setAttribute(AttributeKey.stringKey(key), (String) value);
} else if (value instanceof Long) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ protected void hookOnNext(ServiceBusMessageContext message) {
Context span = instrumentation.instrumentProcess("ServiceBus.process", message.getMessage(), Context.NONE);
message.getMessage().setContext(span);

AutoCloseable scope = tracer.makeSpanCurrent(span);
try {
downstream.onNext(message);
} catch (Throwable t) {
Expand All @@ -72,7 +73,7 @@ protected void hookOnNext(ServiceBusMessageContext message) {
exception = (Throwable) processorException;
}
}
tracer.endSpan(exception, context, null);
tracer.endSpan(exception, context, scope);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.metrics.Meter;
import com.azure.core.util.metrics.MeterProvider;
import com.azure.core.util.tracing.Tracer;
import com.azure.core.util.tracing.TracerProvider;
import com.azure.messaging.servicebus.implementation.MessageUtils;
import com.azure.messaging.servicebus.implementation.MessagingEntityType;
import com.azure.messaging.servicebus.implementation.ServiceBusAmqpConnection;
Expand All @@ -42,7 +44,6 @@
import com.azure.messaging.servicebus.implementation.ServiceBusReactorAmqpConnection;
import com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusReceiverInstrumentation;
import com.azure.messaging.servicebus.implementation.ServiceBusSharedKeyCredential;
import com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusTracer;
import com.azure.messaging.servicebus.implementation.ServiceBusProcessorClientOptions;
import com.azure.messaging.servicebus.models.ServiceBusReceiveMode;
import com.azure.messaging.servicebus.models.SubQueue;
Expand All @@ -64,6 +65,7 @@
import java.util.regex.Pattern;

import static com.azure.core.amqp.implementation.ClientConstants.ENTITY_PATH_KEY;
import static com.azure.messaging.servicebus.implementation.ServiceBusConstants.AZ_TRACING_NAMESPACE_VALUE;

/**
* The builder to create Service Bus clients:
Expand Down Expand Up @@ -209,7 +211,6 @@ public final class ServiceBusClientBuilder implements
private static final Pattern HOST_PORT_PATTERN = Pattern.compile("^[^:]+:\\d+");
private static final Duration MAX_LOCK_RENEW_DEFAULT_DURATION = Duration.ofMinutes(5);
private static final ClientLogger LOGGER = new ClientLogger(ServiceBusClientBuilder.class);

private final Object connectionLock = new Object();
private final MessageSerializer messageSerializer = new ServiceBusMessageSerializer();
private ClientOptions clientOptions;
Expand Down Expand Up @@ -932,8 +933,8 @@ public ServiceBusSenderAsyncClient buildAsyncClient() {
clientIdentifier = UUID.randomUUID().toString();
}

final ServiceBusSenderInstrumentation instrumentation = new ServiceBusSenderInstrumentation(ServiceBusTracer.getDefaultTracer(),
createMeter(), connectionProcessor.getFullyQualifiedNamespace(), entityName);
final ServiceBusSenderInstrumentation instrumentation = new ServiceBusSenderInstrumentation(
createTracer(), createMeter(), connectionProcessor.getFullyQualifiedNamespace(), entityName);

return new ServiceBusSenderAsyncClient(entityName, entityType, connectionProcessor, retryOptions,
instrumentation, messageSerializer, ServiceBusClientBuilder.this::onClientClose, null, clientIdentifier);
Expand Down Expand Up @@ -1418,7 +1419,7 @@ ServiceBusReceiverAsyncClient buildAsyncClientForProcessor() {
connectionProcessor, messageSerializer, receiverOptions, clientIdentifier);

final ServiceBusReceiverInstrumentation instrumentation = new ServiceBusReceiverInstrumentation(
ServiceBusTracer.getDefaultTracer(), createMeter(), connectionProcessor.getFullyQualifiedNamespace(),
createTracer(), createMeter(), connectionProcessor.getFullyQualifiedNamespace(),
entityPath, subscriptionName, false);
return new ServiceBusReceiverAsyncClient(connectionProcessor.getFullyQualifiedNamespace(), entityPath,
entityType, receiverOptions, connectionProcessor, ServiceBusConstants.OPERATION_TIMEOUT,
Expand Down Expand Up @@ -1494,8 +1495,8 @@ private ServiceBusSessionReceiverAsyncClient buildAsyncClient(boolean isAutoComp
clientIdentifier = UUID.randomUUID().toString();
}

final ServiceBusReceiverInstrumentation instrumentation = new ServiceBusReceiverInstrumentation(ServiceBusTracer.getDefaultTracer(),
createMeter(), connectionProcessor.getFullyQualifiedNamespace(), entityPath, subscriptionName, syncConsumer);
final ServiceBusReceiverInstrumentation instrumentation = new ServiceBusReceiverInstrumentation(
createTracer(), createMeter(), connectionProcessor.getFullyQualifiedNamespace(), entityPath, subscriptionName, syncConsumer);
return new ServiceBusSessionReceiverAsyncClient(connectionProcessor.getFullyQualifiedNamespace(),
entityPath, entityType, receiverOptions, connectionProcessor, instrumentation, messageSerializer,
ServiceBusClientBuilder.this::onClientClose, clientIdentifier);
Expand Down Expand Up @@ -1977,8 +1978,8 @@ ServiceBusReceiverAsyncClient buildAsyncClient(boolean isAutoCompleteAllowed, bo
clientIdentifier = UUID.randomUUID().toString();
}

final ServiceBusReceiverInstrumentation instrumentation = new ServiceBusReceiverInstrumentation(ServiceBusTracer.getDefaultTracer(),
createMeter(), connectionProcessor.getFullyQualifiedNamespace(), entityPath, subscriptionName, syncConsumer);
final ServiceBusReceiverInstrumentation instrumentation = new ServiceBusReceiverInstrumentation(
createTracer(), createMeter(), connectionProcessor.getFullyQualifiedNamespace(), entityPath, subscriptionName, syncConsumer);
return new ServiceBusReceiverAsyncClient(connectionProcessor.getFullyQualifiedNamespace(), entityPath,
entityType, receiverOptions, connectionProcessor, ServiceBusConstants.OPERATION_TIMEOUT,
instrumentation, messageSerializer, ServiceBusClientBuilder.this::onClientClose, clientIdentifier);
Expand Down Expand Up @@ -2074,4 +2075,9 @@ private Meter createMeter() {
return MeterProvider.getDefaultProvider().createMeter(LIBRARY_NAME, LIBRARY_VERSION,
clientOptions == null ? null : clientOptions.getMetricsOptions());
}

private Tracer createTracer() {
return TracerProvider.getDefaultProvider().createTracer(LIBRARY_NAME, LIBRARY_VERSION,
AZ_TRACING_NAMESPACE_VALUE, clientOptions == null ? null : clientOptions.getTracingOptions());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package com.azure.messaging.servicebus;

import com.azure.core.amqp.AmqpRetryPolicy;
import com.azure.core.amqp.AmqpSession;
import com.azure.core.amqp.AmqpTransaction;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.amqp.implementation.MessageSerializer;
Expand Down Expand Up @@ -834,15 +835,11 @@ public Flux<ServiceBusReceivedMessage> receiveMessages() {
Flux<ServiceBusReceivedMessage> receiveMessagesNoBackPressure() {
return receiveMessagesWithContext(0)
.handle((serviceBusMessageContext, sink) -> {
try (AutoCloseable scope = tracer.makeSpanCurrent(serviceBusMessageContext.getMessage().getContext())) {
if (serviceBusMessageContext.hasError()) {
sink.error(serviceBusMessageContext.getThrowable());
return;
}
sink.next(serviceBusMessageContext.getMessage());
} catch (Exception ex) {
LOGGER.verbose("Error disposing scope", ex);
if (serviceBusMessageContext.hasError()) {
sink.error(serviceBusMessageContext.getThrowable());
return;
}
sink.next(serviceBusMessageContext.getMessage());
});
}

Expand Down Expand Up @@ -1199,7 +1196,7 @@ public Mono<ServiceBusTransactionContext> createTransaction() {

return tracer.traceMono("ServiceBus.commitTransaction", connectionProcessor
.flatMap(connection -> connection.createSession(TRANSACTION_LINK_NAME))
.flatMap(transactionSession -> transactionSession.createTransaction())
.flatMap(AmqpSession::createTransaction)
.map(transaction -> new ServiceBusTransactionContext(transaction.getTransactionId())))
.onErrorMap(throwable -> mapError(throwable, ServiceBusErrorSource.RECEIVE));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,7 @@ public Flux<Long> scheduleMessages(Iterable<ServiceBusMessage> messages, OffsetD
return messageBatch;
})
.flatMapMany(messageBatch ->
tracer.traceFluxWithLinks("ServiceBus.scheduleMessages",
tracer.traceScheduleFlux("ServiceBus.scheduleMessages",
connectionProcessor
.flatMap(connection -> connection.getManagementNode(entityName, entityType))
.flatMapMany(managementNode -> managementNode.schedule(messageBatch.getMessages(), scheduledEnqueueTime,
Expand Down Expand Up @@ -703,7 +703,7 @@ private Mono<Long> scheduleMessageInternal(ServiceBusMessage message, OffsetDate
return monoError(LOGGER, new NullPointerException("'scheduledEnqueueTime' cannot be null."));
}

return tracer.traceMonoWithLink("ServiceBus.scheduleMessage",
return tracer.traceScheduleMono("ServiceBus.scheduleMessage",
getSendLink().flatMap(link -> link.getLinkSize().flatMap(size -> {
int maxSize = size > 0
? size
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,12 @@ <T> Mono<T> instrumentSendBatch(String spanName, Mono<T> publisher, List<Service
meter.reportBatchSend(batch.size(), signal.getThrowable(), span);
tracer.endSpan(signal.getThrowable(), span, null);
})
.contextWrite(reactor.util.context.Context.of(REACTOR_PARENT_TRACE_CONTEXT_KEY, tracer.startSpanWithLinks(spanName, batch,
.contextWrite(reactor.util.context.Context.of(REACTOR_PARENT_TRACE_CONTEXT_KEY, tracer.startSpanWithLinks(spanName,
ServiceBusTracer.OperationName.PUBLISH, batch,
ServiceBusMessage::getContext, Context.NONE)));
} else {
return publisher
.doOnEach(signal -> {
meter.reportBatchSend(batch.size(), signal.getThrowable(), Context.NONE);
});
.doOnEach(signal -> meter.reportBatchSend(batch.size(), signal.getThrowable(), Context.NONE));
}
}
}
Loading