Skip to content

Commit

Permalink
Onboard ServiceBus onto new tracing (#33663)
Browse files Browse the repository at this point in the history
* Onboard ServiceBus onto new tracing and fix tests

* update sb admin client
* more test hardening
* more admin tests fixes
  • Loading branch information
lmolkova authored Mar 27, 2023
1 parent 95129cc commit 536c395
Show file tree
Hide file tree
Showing 20 changed files with 494 additions and 431 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ private static Map<String, String> getMappings() {
// messaging mapping, attributes are defined in com.azure.core.amqp.implementation.ClientConstants and in EventHubs, ServiceBus
// metric helpers
mappings.put("status", "otel.status_code");
mappings.put("entityName", "messaging.destination");
mappings.put("entityName", "messaging.destination.name");
mappings.put("entityPath", "messaging.az.entity_path");
mappings.put("hostName", "net.peer.name");
mappings.put("errorCondition", "amqp.error_condition");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public void attributeMappings() {
assertEquals(13, attributes.size());
assertEquals("value", attributes.get(AttributeKey.stringKey("foobar")));
assertEquals("host", attributes.get(AttributeKey.stringKey("net.peer.name")));
assertEquals("entity", attributes.get(AttributeKey.stringKey("messaging.destination")));
assertEquals("entity", attributes.get(AttributeKey.stringKey("messaging.destination.name")));
assertEquals("path", attributes.get(AttributeKey.stringKey("messaging.az.entity_path")));
assertEquals("amqp::error::code", attributes.get(AttributeKey.stringKey("amqp.error_condition")));
assertEquals("rejected", attributes.get(AttributeKey.stringKey("amqp.delivery_state")));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

Expand All @@ -21,47 +19,50 @@
class OpenTelemetryUtils {
private static final ClientLogger LOGGER = new ClientLogger(OpenTelemetryUtils.class);

private static final Map<String, String> ATTRIBUTE_MAPPING_V1_17_0 = getMappingsV1200();
static final String SERVICE_REQUEST_ID_ATTRIBUTE = "serviceRequestId";
static final String CLIENT_REQUEST_ID_ATTRIBUTE = "requestId";

private static Map<String, String> getMappingsV1200() {
Map<String, String> mappings = new HashMap<>(8);
// messaging mapping, attributes are defined in com.azure.core.amqp.implementation.ClientConstants
mappings.put(ENTITY_PATH_KEY, "messaging.destination.name");
mappings.put(HOST_NAME_KEY, "net.peer.name");
mappings.put(CLIENT_REQUEST_ID_ATTRIBUTE, "az.client_request_id");
mappings.put(SERVICE_REQUEST_ID_ATTRIBUTE, "az.service_request_id");

return Collections.unmodifiableMap(mappings);
}

public static Attributes convert(Map<String, Object> attributeMap, OpenTelemetrySchemaVersion schemaVersion) {
if (attributeMap == null || attributeMap.isEmpty()) {
return Attributes.empty();
}

Map<String, String> mappings = getMappingsForVersion(schemaVersion);

AttributesBuilder builder = Attributes.builder();
for (Map.Entry<String, Object> kvp : attributeMap.entrySet()) {
if (kvp.getValue() == null) {
continue;
}

addAttribute(builder, mappings.getOrDefault(kvp.getKey(), kvp.getKey()), kvp.getValue());
addAttribute(builder, mapAttributeName(kvp.getKey(), schemaVersion), kvp.getValue());
}

return builder.build();
}

private static Map<String, String> getMappingsForVersion(OpenTelemetrySchemaVersion version) {
private static String mapAttributeName(String name, OpenTelemetrySchemaVersion version) {
if (version == OpenTelemetrySchemaVersion.V1_17_0) {
return ATTRIBUTE_MAPPING_V1_17_0;
return mapAttributeNameV1170(name);
}

LOGGER.verbose("Unknown OpenTelemetry Semantic Conventions version: {}, using latest instead: {}", version, OpenTelemetrySchemaVersion.getLatest());
return getMappingsForVersion(OpenTelemetrySchemaVersion.getLatest());
return mapAttributeNameV1170(name);
}

private static String mapAttributeNameV1170(String name) {
if (ENTITY_PATH_KEY.equals(name)) {
return "messaging.destination.name";
}
if (HOST_NAME_KEY.equals(name)) {
return "net.peer.name";
}
if (CLIENT_REQUEST_ID_ATTRIBUTE.equals(name)) {
return "az.client_request_id";
}
if (SERVICE_REQUEST_ID_ATTRIBUTE.equals(name)) {
return "az.service_request_id";
}
return name;
}

/**
Expand Down Expand Up @@ -107,7 +108,7 @@ private static void addAttribute(AttributesBuilder attributesBuilder, String key
static void addAttribute(Span span, String key, Object value, OpenTelemetrySchemaVersion schemaVersion) {
Objects.requireNonNull(key, "OpenTelemetry attribute name cannot be null.");

key = getMappingsForVersion(schemaVersion).getOrDefault(key, key);
key = mapAttributeName(key, schemaVersion);
if (value instanceof String) {
span.setAttribute(AttributeKey.stringKey(key), (String) value);
} else if (value instanceof Long) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ protected void hookOnNext(ServiceBusMessageContext message) {
Context span = instrumentation.instrumentProcess("ServiceBus.process", message.getMessage(), Context.NONE);
message.getMessage().setContext(span);

AutoCloseable scope = tracer.makeSpanCurrent(span);
try {
downstream.onNext(message);
} catch (Throwable t) {
Expand All @@ -72,7 +73,7 @@ protected void hookOnNext(ServiceBusMessageContext message) {
exception = (Throwable) processorException;
}
}
tracer.endSpan(exception, context, null);
tracer.endSpan(exception, context, scope);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.metrics.Meter;
import com.azure.core.util.metrics.MeterProvider;
import com.azure.core.util.tracing.Tracer;
import com.azure.core.util.tracing.TracerProvider;
import com.azure.messaging.servicebus.implementation.MessageUtils;
import com.azure.messaging.servicebus.implementation.MessagingEntityType;
import com.azure.messaging.servicebus.implementation.ServiceBusAmqpConnection;
Expand All @@ -42,7 +44,6 @@
import com.azure.messaging.servicebus.implementation.ServiceBusReactorAmqpConnection;
import com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusReceiverInstrumentation;
import com.azure.messaging.servicebus.implementation.ServiceBusSharedKeyCredential;
import com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusTracer;
import com.azure.messaging.servicebus.implementation.ServiceBusProcessorClientOptions;
import com.azure.messaging.servicebus.models.ServiceBusReceiveMode;
import com.azure.messaging.servicebus.models.SubQueue;
Expand All @@ -64,6 +65,7 @@
import java.util.regex.Pattern;

import static com.azure.core.amqp.implementation.ClientConstants.ENTITY_PATH_KEY;
import static com.azure.messaging.servicebus.implementation.ServiceBusConstants.AZ_TRACING_NAMESPACE_VALUE;

/**
* The builder to create Service Bus clients:
Expand Down Expand Up @@ -209,7 +211,6 @@ public final class ServiceBusClientBuilder implements
private static final Pattern HOST_PORT_PATTERN = Pattern.compile("^[^:]+:\\d+");
private static final Duration MAX_LOCK_RENEW_DEFAULT_DURATION = Duration.ofMinutes(5);
private static final ClientLogger LOGGER = new ClientLogger(ServiceBusClientBuilder.class);

private final Object connectionLock = new Object();
private final MessageSerializer messageSerializer = new ServiceBusMessageSerializer();
private ClientOptions clientOptions;
Expand Down Expand Up @@ -932,8 +933,8 @@ public ServiceBusSenderAsyncClient buildAsyncClient() {
clientIdentifier = UUID.randomUUID().toString();
}

final ServiceBusSenderInstrumentation instrumentation = new ServiceBusSenderInstrumentation(ServiceBusTracer.getDefaultTracer(),
createMeter(), connectionProcessor.getFullyQualifiedNamespace(), entityName);
final ServiceBusSenderInstrumentation instrumentation = new ServiceBusSenderInstrumentation(
createTracer(), createMeter(), connectionProcessor.getFullyQualifiedNamespace(), entityName);

return new ServiceBusSenderAsyncClient(entityName, entityType, connectionProcessor, retryOptions,
instrumentation, messageSerializer, ServiceBusClientBuilder.this::onClientClose, null, clientIdentifier);
Expand Down Expand Up @@ -1418,7 +1419,7 @@ ServiceBusReceiverAsyncClient buildAsyncClientForProcessor() {
connectionProcessor, messageSerializer, receiverOptions, clientIdentifier);

final ServiceBusReceiverInstrumentation instrumentation = new ServiceBusReceiverInstrumentation(
ServiceBusTracer.getDefaultTracer(), createMeter(), connectionProcessor.getFullyQualifiedNamespace(),
createTracer(), createMeter(), connectionProcessor.getFullyQualifiedNamespace(),
entityPath, subscriptionName, false);
return new ServiceBusReceiverAsyncClient(connectionProcessor.getFullyQualifiedNamespace(), entityPath,
entityType, receiverOptions, connectionProcessor, ServiceBusConstants.OPERATION_TIMEOUT,
Expand Down Expand Up @@ -1494,8 +1495,8 @@ private ServiceBusSessionReceiverAsyncClient buildAsyncClient(boolean isAutoComp
clientIdentifier = UUID.randomUUID().toString();
}

final ServiceBusReceiverInstrumentation instrumentation = new ServiceBusReceiverInstrumentation(ServiceBusTracer.getDefaultTracer(),
createMeter(), connectionProcessor.getFullyQualifiedNamespace(), entityPath, subscriptionName, syncConsumer);
final ServiceBusReceiverInstrumentation instrumentation = new ServiceBusReceiverInstrumentation(
createTracer(), createMeter(), connectionProcessor.getFullyQualifiedNamespace(), entityPath, subscriptionName, syncConsumer);
return new ServiceBusSessionReceiverAsyncClient(connectionProcessor.getFullyQualifiedNamespace(),
entityPath, entityType, receiverOptions, connectionProcessor, instrumentation, messageSerializer,
ServiceBusClientBuilder.this::onClientClose, clientIdentifier);
Expand Down Expand Up @@ -1977,8 +1978,8 @@ ServiceBusReceiverAsyncClient buildAsyncClient(boolean isAutoCompleteAllowed, bo
clientIdentifier = UUID.randomUUID().toString();
}

final ServiceBusReceiverInstrumentation instrumentation = new ServiceBusReceiverInstrumentation(ServiceBusTracer.getDefaultTracer(),
createMeter(), connectionProcessor.getFullyQualifiedNamespace(), entityPath, subscriptionName, syncConsumer);
final ServiceBusReceiverInstrumentation instrumentation = new ServiceBusReceiverInstrumentation(
createTracer(), createMeter(), connectionProcessor.getFullyQualifiedNamespace(), entityPath, subscriptionName, syncConsumer);
return new ServiceBusReceiverAsyncClient(connectionProcessor.getFullyQualifiedNamespace(), entityPath,
entityType, receiverOptions, connectionProcessor, ServiceBusConstants.OPERATION_TIMEOUT,
instrumentation, messageSerializer, ServiceBusClientBuilder.this::onClientClose, clientIdentifier);
Expand Down Expand Up @@ -2074,4 +2075,9 @@ private Meter createMeter() {
return MeterProvider.getDefaultProvider().createMeter(LIBRARY_NAME, LIBRARY_VERSION,
clientOptions == null ? null : clientOptions.getMetricsOptions());
}

private Tracer createTracer() {
return TracerProvider.getDefaultProvider().createTracer(LIBRARY_NAME, LIBRARY_VERSION,
AZ_TRACING_NAMESPACE_VALUE, clientOptions == null ? null : clientOptions.getTracingOptions());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package com.azure.messaging.servicebus;

import com.azure.core.amqp.AmqpRetryPolicy;
import com.azure.core.amqp.AmqpSession;
import com.azure.core.amqp.AmqpTransaction;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.amqp.implementation.MessageSerializer;
Expand Down Expand Up @@ -834,15 +835,11 @@ public Flux<ServiceBusReceivedMessage> receiveMessages() {
Flux<ServiceBusReceivedMessage> receiveMessagesNoBackPressure() {
return receiveMessagesWithContext(0)
.handle((serviceBusMessageContext, sink) -> {
try (AutoCloseable scope = tracer.makeSpanCurrent(serviceBusMessageContext.getMessage().getContext())) {
if (serviceBusMessageContext.hasError()) {
sink.error(serviceBusMessageContext.getThrowable());
return;
}
sink.next(serviceBusMessageContext.getMessage());
} catch (Exception ex) {
LOGGER.verbose("Error disposing scope", ex);
if (serviceBusMessageContext.hasError()) {
sink.error(serviceBusMessageContext.getThrowable());
return;
}
sink.next(serviceBusMessageContext.getMessage());
});
}

Expand Down Expand Up @@ -1199,7 +1196,7 @@ public Mono<ServiceBusTransactionContext> createTransaction() {

return tracer.traceMono("ServiceBus.commitTransaction", connectionProcessor
.flatMap(connection -> connection.createSession(TRANSACTION_LINK_NAME))
.flatMap(transactionSession -> transactionSession.createTransaction())
.flatMap(AmqpSession::createTransaction)
.map(transaction -> new ServiceBusTransactionContext(transaction.getTransactionId())))
.onErrorMap(throwable -> mapError(throwable, ServiceBusErrorSource.RECEIVE));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,7 @@ public Flux<Long> scheduleMessages(Iterable<ServiceBusMessage> messages, OffsetD
return messageBatch;
})
.flatMapMany(messageBatch ->
tracer.traceFluxWithLinks("ServiceBus.scheduleMessages",
tracer.traceScheduleFlux("ServiceBus.scheduleMessages",
connectionProcessor
.flatMap(connection -> connection.getManagementNode(entityName, entityType))
.flatMapMany(managementNode -> managementNode.schedule(messageBatch.getMessages(), scheduledEnqueueTime,
Expand Down Expand Up @@ -703,7 +703,7 @@ private Mono<Long> scheduleMessageInternal(ServiceBusMessage message, OffsetDate
return monoError(LOGGER, new NullPointerException("'scheduledEnqueueTime' cannot be null."));
}

return tracer.traceMonoWithLink("ServiceBus.scheduleMessage",
return tracer.traceScheduleMono("ServiceBus.scheduleMessage",
getSendLink().flatMap(link -> link.getLinkSize().flatMap(size -> {
int maxSize = size > 0
? size
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,12 @@ <T> Mono<T> instrumentSendBatch(String spanName, Mono<T> publisher, List<Service
meter.reportBatchSend(batch.size(), signal.getThrowable(), span);
tracer.endSpan(signal.getThrowable(), span, null);
})
.contextWrite(reactor.util.context.Context.of(REACTOR_PARENT_TRACE_CONTEXT_KEY, tracer.startSpanWithLinks(spanName, batch,
.contextWrite(reactor.util.context.Context.of(REACTOR_PARENT_TRACE_CONTEXT_KEY, tracer.startSpanWithLinks(spanName,
ServiceBusTracer.OperationName.PUBLISH, batch,
ServiceBusMessage::getContext, Context.NONE)));
} else {
return publisher
.doOnEach(signal -> {
meter.reportBatchSend(batch.size(), signal.getThrowable(), Context.NONE);
});
.doOnEach(signal -> meter.reportBatchSend(batch.size(), signal.getThrowable(), Context.NONE));
}
}
}
Loading

0 comments on commit 536c395

Please sign in to comment.