diff --git a/sdk/core/azure-core-metrics-opentelemetry/src/main/java/com/azure/core/metrics/opentelemetry/OpenTelemetryAttributes.java b/sdk/core/azure-core-metrics-opentelemetry/src/main/java/com/azure/core/metrics/opentelemetry/OpenTelemetryAttributes.java index c6747e9eb41c8..10acd8efebdcd 100644 --- a/sdk/core/azure-core-metrics-opentelemetry/src/main/java/com/azure/core/metrics/opentelemetry/OpenTelemetryAttributes.java +++ b/sdk/core/azure-core-metrics-opentelemetry/src/main/java/com/azure/core/metrics/opentelemetry/OpenTelemetryAttributes.java @@ -23,7 +23,7 @@ private static Map getMappings() { // messaging mapping, attributes are defined in com.azure.core.amqp.implementation.ClientConstants and in EventHubs, ServiceBus // metric helpers mappings.put("status", "otel.status_code"); - mappings.put("entityName", "messaging.destination"); + mappings.put("entityName", "messaging.destination.name"); mappings.put("entityPath", "messaging.az.entity_path"); mappings.put("hostName", "net.peer.name"); mappings.put("errorCondition", "amqp.error_condition"); diff --git a/sdk/core/azure-core-metrics-opentelemetry/src/test/java/com/azure/core/metrics/opentelemetry/OpenTelemetryAttributesTests.java b/sdk/core/azure-core-metrics-opentelemetry/src/test/java/com/azure/core/metrics/opentelemetry/OpenTelemetryAttributesTests.java index 9fb5ce5b83b63..ed1679710832d 100644 --- a/sdk/core/azure-core-metrics-opentelemetry/src/test/java/com/azure/core/metrics/opentelemetry/OpenTelemetryAttributesTests.java +++ b/sdk/core/azure-core-metrics-opentelemetry/src/test/java/com/azure/core/metrics/opentelemetry/OpenTelemetryAttributesTests.java @@ -82,7 +82,7 @@ public void attributeMappings() { assertEquals(13, attributes.size()); assertEquals("value", attributes.get(AttributeKey.stringKey("foobar"))); assertEquals("host", attributes.get(AttributeKey.stringKey("net.peer.name"))); - assertEquals("entity", attributes.get(AttributeKey.stringKey("messaging.destination"))); + assertEquals("entity", attributes.get(AttributeKey.stringKey("messaging.destination.name"))); assertEquals("path", attributes.get(AttributeKey.stringKey("messaging.az.entity_path"))); assertEquals("amqp::error::code", attributes.get(AttributeKey.stringKey("amqp.error_condition"))); assertEquals("rejected", attributes.get(AttributeKey.stringKey("amqp.delivery_state"))); diff --git a/sdk/core/azure-core-tracing-opentelemetry/src/main/java/com/azure/core/tracing/opentelemetry/OpenTelemetryUtils.java b/sdk/core/azure-core-tracing-opentelemetry/src/main/java/com/azure/core/tracing/opentelemetry/OpenTelemetryUtils.java index 2f99978a80d3b..667b2d5b57c29 100644 --- a/sdk/core/azure-core-tracing-opentelemetry/src/main/java/com/azure/core/tracing/opentelemetry/OpenTelemetryUtils.java +++ b/sdk/core/azure-core-tracing-opentelemetry/src/main/java/com/azure/core/tracing/opentelemetry/OpenTelemetryUtils.java @@ -10,8 +10,6 @@ import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.StatusCode; -import java.util.Collections; -import java.util.HashMap; import java.util.Map; import java.util.Objects; @@ -21,47 +19,50 @@ class OpenTelemetryUtils { private static final ClientLogger LOGGER = new ClientLogger(OpenTelemetryUtils.class); - private static final Map ATTRIBUTE_MAPPING_V1_17_0 = getMappingsV1200(); static final String SERVICE_REQUEST_ID_ATTRIBUTE = "serviceRequestId"; static final String CLIENT_REQUEST_ID_ATTRIBUTE = "requestId"; - private static Map getMappingsV1200() { - Map mappings = new HashMap<>(8); - // messaging mapping, attributes are defined in com.azure.core.amqp.implementation.ClientConstants - mappings.put(ENTITY_PATH_KEY, "messaging.destination.name"); - mappings.put(HOST_NAME_KEY, "net.peer.name"); - mappings.put(CLIENT_REQUEST_ID_ATTRIBUTE, "az.client_request_id"); - mappings.put(SERVICE_REQUEST_ID_ATTRIBUTE, "az.service_request_id"); - - return Collections.unmodifiableMap(mappings); - } public static Attributes convert(Map attributeMap, OpenTelemetrySchemaVersion schemaVersion) { if (attributeMap == null || attributeMap.isEmpty()) { return Attributes.empty(); } - Map mappings = getMappingsForVersion(schemaVersion); - AttributesBuilder builder = Attributes.builder(); for (Map.Entry kvp : attributeMap.entrySet()) { if (kvp.getValue() == null) { continue; } - addAttribute(builder, mappings.getOrDefault(kvp.getKey(), kvp.getKey()), kvp.getValue()); + addAttribute(builder, mapAttributeName(kvp.getKey(), schemaVersion), kvp.getValue()); } return builder.build(); } - private static Map getMappingsForVersion(OpenTelemetrySchemaVersion version) { + private static String mapAttributeName(String name, OpenTelemetrySchemaVersion version) { if (version == OpenTelemetrySchemaVersion.V1_17_0) { - return ATTRIBUTE_MAPPING_V1_17_0; + return mapAttributeNameV1170(name); } LOGGER.verbose("Unknown OpenTelemetry Semantic Conventions version: {}, using latest instead: {}", version, OpenTelemetrySchemaVersion.getLatest()); - return getMappingsForVersion(OpenTelemetrySchemaVersion.getLatest()); + return mapAttributeNameV1170(name); + } + + private static String mapAttributeNameV1170(String name) { + if (ENTITY_PATH_KEY.equals(name)) { + return "messaging.destination.name"; + } + if (HOST_NAME_KEY.equals(name)) { + return "net.peer.name"; + } + if (CLIENT_REQUEST_ID_ATTRIBUTE.equals(name)) { + return "az.client_request_id"; + } + if (SERVICE_REQUEST_ID_ATTRIBUTE.equals(name)) { + return "az.service_request_id"; + } + return name; } /** @@ -107,7 +108,7 @@ private static void addAttribute(AttributesBuilder attributesBuilder, String key static void addAttribute(Span span, String key, Object value, OpenTelemetrySchemaVersion schemaVersion) { Objects.requireNonNull(key, "OpenTelemetry attribute name cannot be null."); - key = getMappingsForVersion(schemaVersion).getOrDefault(key, key); + key = mapAttributeName(key, schemaVersion); if (value instanceof String) { span.setAttribute(AttributeKey.stringKey(key), (String) value); } else if (value instanceof Long) { diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/FluxTrace.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/FluxTrace.java index 16c4c0a50edc4..e0f0af7c61196 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/FluxTrace.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/FluxTrace.java @@ -60,6 +60,7 @@ protected void hookOnNext(ServiceBusMessageContext message) { Context span = instrumentation.instrumentProcess("ServiceBus.process", message.getMessage(), Context.NONE); message.getMessage().setContext(span); + AutoCloseable scope = tracer.makeSpanCurrent(span); try { downstream.onNext(message); } catch (Throwable t) { @@ -72,7 +73,7 @@ protected void hookOnNext(ServiceBusMessageContext message) { exception = (Throwable) processorException; } } - tracer.endSpan(exception, context, null); + tracer.endSpan(exception, context, scope); } } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusClientBuilder.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusClientBuilder.java index 52dd169e8a18a..076799ac14cf2 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusClientBuilder.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusClientBuilder.java @@ -34,6 +34,8 @@ import com.azure.core.util.logging.ClientLogger; import com.azure.core.util.metrics.Meter; import com.azure.core.util.metrics.MeterProvider; +import com.azure.core.util.tracing.Tracer; +import com.azure.core.util.tracing.TracerProvider; import com.azure.messaging.servicebus.implementation.MessageUtils; import com.azure.messaging.servicebus.implementation.MessagingEntityType; import com.azure.messaging.servicebus.implementation.ServiceBusAmqpConnection; @@ -42,7 +44,6 @@ import com.azure.messaging.servicebus.implementation.ServiceBusReactorAmqpConnection; import com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusReceiverInstrumentation; import com.azure.messaging.servicebus.implementation.ServiceBusSharedKeyCredential; -import com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusTracer; import com.azure.messaging.servicebus.implementation.ServiceBusProcessorClientOptions; import com.azure.messaging.servicebus.models.ServiceBusReceiveMode; import com.azure.messaging.servicebus.models.SubQueue; @@ -64,6 +65,7 @@ import java.util.regex.Pattern; import static com.azure.core.amqp.implementation.ClientConstants.ENTITY_PATH_KEY; +import static com.azure.messaging.servicebus.implementation.ServiceBusConstants.AZ_TRACING_NAMESPACE_VALUE; /** * The builder to create Service Bus clients: @@ -209,7 +211,6 @@ public final class ServiceBusClientBuilder implements private static final Pattern HOST_PORT_PATTERN = Pattern.compile("^[^:]+:\\d+"); private static final Duration MAX_LOCK_RENEW_DEFAULT_DURATION = Duration.ofMinutes(5); private static final ClientLogger LOGGER = new ClientLogger(ServiceBusClientBuilder.class); - private final Object connectionLock = new Object(); private final MessageSerializer messageSerializer = new ServiceBusMessageSerializer(); private ClientOptions clientOptions; @@ -932,8 +933,8 @@ public ServiceBusSenderAsyncClient buildAsyncClient() { clientIdentifier = UUID.randomUUID().toString(); } - final ServiceBusSenderInstrumentation instrumentation = new ServiceBusSenderInstrumentation(ServiceBusTracer.getDefaultTracer(), - createMeter(), connectionProcessor.getFullyQualifiedNamespace(), entityName); + final ServiceBusSenderInstrumentation instrumentation = new ServiceBusSenderInstrumentation( + createTracer(), createMeter(), connectionProcessor.getFullyQualifiedNamespace(), entityName); return new ServiceBusSenderAsyncClient(entityName, entityType, connectionProcessor, retryOptions, instrumentation, messageSerializer, ServiceBusClientBuilder.this::onClientClose, null, clientIdentifier); @@ -1418,7 +1419,7 @@ ServiceBusReceiverAsyncClient buildAsyncClientForProcessor() { connectionProcessor, messageSerializer, receiverOptions, clientIdentifier); final ServiceBusReceiverInstrumentation instrumentation = new ServiceBusReceiverInstrumentation( - ServiceBusTracer.getDefaultTracer(), createMeter(), connectionProcessor.getFullyQualifiedNamespace(), + createTracer(), createMeter(), connectionProcessor.getFullyQualifiedNamespace(), entityPath, subscriptionName, false); return new ServiceBusReceiverAsyncClient(connectionProcessor.getFullyQualifiedNamespace(), entityPath, entityType, receiverOptions, connectionProcessor, ServiceBusConstants.OPERATION_TIMEOUT, @@ -1494,8 +1495,8 @@ private ServiceBusSessionReceiverAsyncClient buildAsyncClient(boolean isAutoComp clientIdentifier = UUID.randomUUID().toString(); } - final ServiceBusReceiverInstrumentation instrumentation = new ServiceBusReceiverInstrumentation(ServiceBusTracer.getDefaultTracer(), - createMeter(), connectionProcessor.getFullyQualifiedNamespace(), entityPath, subscriptionName, syncConsumer); + final ServiceBusReceiverInstrumentation instrumentation = new ServiceBusReceiverInstrumentation( + createTracer(), createMeter(), connectionProcessor.getFullyQualifiedNamespace(), entityPath, subscriptionName, syncConsumer); return new ServiceBusSessionReceiverAsyncClient(connectionProcessor.getFullyQualifiedNamespace(), entityPath, entityType, receiverOptions, connectionProcessor, instrumentation, messageSerializer, ServiceBusClientBuilder.this::onClientClose, clientIdentifier); @@ -1977,8 +1978,8 @@ ServiceBusReceiverAsyncClient buildAsyncClient(boolean isAutoCompleteAllowed, bo clientIdentifier = UUID.randomUUID().toString(); } - final ServiceBusReceiverInstrumentation instrumentation = new ServiceBusReceiverInstrumentation(ServiceBusTracer.getDefaultTracer(), - createMeter(), connectionProcessor.getFullyQualifiedNamespace(), entityPath, subscriptionName, syncConsumer); + final ServiceBusReceiverInstrumentation instrumentation = new ServiceBusReceiverInstrumentation( + createTracer(), createMeter(), connectionProcessor.getFullyQualifiedNamespace(), entityPath, subscriptionName, syncConsumer); return new ServiceBusReceiverAsyncClient(connectionProcessor.getFullyQualifiedNamespace(), entityPath, entityType, receiverOptions, connectionProcessor, ServiceBusConstants.OPERATION_TIMEOUT, instrumentation, messageSerializer, ServiceBusClientBuilder.this::onClientClose, clientIdentifier); @@ -2074,4 +2075,9 @@ private Meter createMeter() { return MeterProvider.getDefaultProvider().createMeter(LIBRARY_NAME, LIBRARY_VERSION, clientOptions == null ? null : clientOptions.getMetricsOptions()); } + + private Tracer createTracer() { + return TracerProvider.getDefaultProvider().createTracer(LIBRARY_NAME, LIBRARY_VERSION, + AZ_TRACING_NAMESPACE_VALUE, clientOptions == null ? null : clientOptions.getTracingOptions()); + } } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java index c864a54dafad1..2a745c1ce2ad7 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java @@ -4,6 +4,7 @@ package com.azure.messaging.servicebus; import com.azure.core.amqp.AmqpRetryPolicy; +import com.azure.core.amqp.AmqpSession; import com.azure.core.amqp.AmqpTransaction; import com.azure.core.amqp.exception.AmqpException; import com.azure.core.amqp.implementation.MessageSerializer; @@ -834,15 +835,11 @@ public Flux receiveMessages() { Flux receiveMessagesNoBackPressure() { return receiveMessagesWithContext(0) .handle((serviceBusMessageContext, sink) -> { - try (AutoCloseable scope = tracer.makeSpanCurrent(serviceBusMessageContext.getMessage().getContext())) { - if (serviceBusMessageContext.hasError()) { - sink.error(serviceBusMessageContext.getThrowable()); - return; - } - sink.next(serviceBusMessageContext.getMessage()); - } catch (Exception ex) { - LOGGER.verbose("Error disposing scope", ex); + if (serviceBusMessageContext.hasError()) { + sink.error(serviceBusMessageContext.getThrowable()); + return; } + sink.next(serviceBusMessageContext.getMessage()); }); } @@ -1199,7 +1196,7 @@ public Mono createTransaction() { return tracer.traceMono("ServiceBus.commitTransaction", connectionProcessor .flatMap(connection -> connection.createSession(TRANSACTION_LINK_NAME)) - .flatMap(transactionSession -> transactionSession.createTransaction()) + .flatMap(AmqpSession::createTransaction) .map(transaction -> new ServiceBusTransactionContext(transaction.getTransactionId()))) .onErrorMap(throwable -> mapError(throwable, ServiceBusErrorSource.RECEIVE)); } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java index 7c790711c4d73..eeecb8af3d951 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java @@ -508,7 +508,7 @@ public Flux scheduleMessages(Iterable messages, OffsetD return messageBatch; }) .flatMapMany(messageBatch -> - tracer.traceFluxWithLinks("ServiceBus.scheduleMessages", + tracer.traceScheduleFlux("ServiceBus.scheduleMessages", connectionProcessor .flatMap(connection -> connection.getManagementNode(entityName, entityType)) .flatMapMany(managementNode -> managementNode.schedule(messageBatch.getMessages(), scheduledEnqueueTime, @@ -703,7 +703,7 @@ private Mono scheduleMessageInternal(ServiceBusMessage message, OffsetDate return monoError(LOGGER, new NullPointerException("'scheduledEnqueueTime' cannot be null.")); } - return tracer.traceMonoWithLink("ServiceBus.scheduleMessage", + return tracer.traceScheduleMono("ServiceBus.scheduleMessage", getSendLink().flatMap(link -> link.getLinkSize().flatMap(size -> { int maxSize = size > 0 ? size diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderInstrumentation.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderInstrumentation.java index e533cfc59a44d..8bc2168f36ae5 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderInstrumentation.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderInstrumentation.java @@ -45,13 +45,12 @@ Mono instrumentSendBatch(String spanName, Mono publisher, List { - meter.reportBatchSend(batch.size(), signal.getThrowable(), Context.NONE); - }); + .doOnEach(signal -> meter.reportBatchSend(batch.size(), signal.getThrowable(), Context.NONE)); } } } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/administration/ServiceBusAdministrationAsyncClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/administration/ServiceBusAdministrationAsyncClient.java index 12639967da401..01954019c6588 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/administration/ServiceBusAdministrationAsyncClient.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/administration/ServiceBusAdministrationAsyncClient.java @@ -70,12 +70,12 @@ import static com.azure.core.util.FluxUtil.monoError; import static com.azure.core.util.FluxUtil.pagedFluxError; import static com.azure.core.util.FluxUtil.withContext; -import static com.azure.core.util.tracing.Tracer.AZ_TRACING_NAMESPACE_KEY; import static com.azure.messaging.servicebus.administration.implementation.EntityHelper.NUMBER_OF_ELEMENTS; import static com.azure.messaging.servicebus.administration.implementation.EntityHelper.QUEUES_ENTITY_TYPE; import static com.azure.messaging.servicebus.administration.implementation.EntityHelper.TOPICS_ENTITY_TYPE; import static com.azure.messaging.servicebus.administration.implementation.EntityHelper.addSupplementaryAuthHeader; import static com.azure.messaging.servicebus.administration.implementation.EntityHelper.extractPage; +import static com.azure.messaging.servicebus.administration.implementation.EntityHelper.getContext; import static com.azure.messaging.servicebus.administration.implementation.EntityHelper.getCreateQueueBody; import static com.azure.messaging.servicebus.administration.implementation.EntityHelper.getCreateRuleBody; import static com.azure.messaging.servicebus.administration.implementation.EntityHelper.getCreateSubscriptionBody; @@ -87,10 +87,8 @@ import static com.azure.messaging.servicebus.administration.implementation.EntityHelper.getSubscriptions; import static com.azure.messaging.servicebus.administration.implementation.EntityHelper.getTitleValue; import static com.azure.messaging.servicebus.administration.implementation.EntityHelper.getTopics; -import static com.azure.messaging.servicebus.administration.implementation.EntityHelper.getTracingContext; import static com.azure.messaging.servicebus.administration.implementation.EntityHelper.getUpdateRuleBody; import static com.azure.messaging.servicebus.administration.implementation.EntityHelper.getUpdateTopicBody; -import static com.azure.messaging.servicebus.implementation.ServiceBusConstants.AZ_TRACING_NAMESPACE_VALUE; import static com.azure.messaging.servicebus.implementation.ServiceBusConstants.SERVICE_BUS_DLQ_SUPPLEMENTARY_AUTHORIZATION_HEADER_NAME; import static com.azure.messaging.servicebus.implementation.ServiceBusConstants.SERVICE_BUS_SUPPLEMENTARY_AUTHORIZATION_HEADER_NAME; @@ -1444,7 +1442,7 @@ Mono> createQueueWithResponse(String queueName, Create } context = context == null ? Context.NONE : context; final Context contextWithHeaders - = getTracingContext(context.addData(AZURE_REQUEST_HTTP_HEADERS_KEY, new HttpHeaders())); + = getContext(context.addData(AZURE_REQUEST_HTTP_HEADERS_KEY, new HttpHeaders())); final String forwardTo = getForwardToEntity(createQueueOptions.getForwardTo(), contextWithHeaders); if (forwardTo != null) { @@ -1495,7 +1493,7 @@ Mono> createRuleWithResponse(String topicName, String s final CreateRuleBody createEntity = getCreateRuleBody(ruleName, ruleOptions); try { return managementClient.getRules().putWithResponseAsync(topicName, subscriptionName, ruleName, createEntity, - null, getTracingContext(context)) + null, getContext(context)) .onErrorMap(ServiceBusAdministrationAsyncClient::mapException) .map(this::deserializeRule); } catch (RuntimeException ex) { @@ -1532,7 +1530,7 @@ Mono> createSubscriptionWithResponse(String top } context = context == null ? Context.NONE : context; final Context contextWithHeaders - = getTracingContext(context.addData(AZURE_REQUEST_HTTP_HEADERS_KEY, new HttpHeaders())); + = getContext(context.addData(AZURE_REQUEST_HTTP_HEADERS_KEY, new HttpHeaders())); final String forwardTo = getForwardToEntity(subscriptionOptions.getForwardTo(), contextWithHeaders); if (forwardTo != null) { @@ -1584,7 +1582,7 @@ Mono> createTopicWithResponse(String topicName, Create } final CreateTopicBody createEntity = getCreateTopicBody(EntityHelper.getTopicDescription(topicOptions)); try { - return entityClient.putWithResponseAsync(topicName, createEntity, null, getTracingContext(context)) + return entityClient.putWithResponseAsync(topicName, createEntity, null, getContext(context)) .onErrorMap(ServiceBusAdministrationAsyncClient::mapException) .map(this::deserializeTopic); } catch (RuntimeException ex) { @@ -1605,7 +1603,7 @@ Mono> deleteQueueWithResponse(String queueName, Context context) return monoError(LOGGER, new IllegalArgumentException("'queueName' cannot be null or empty.")); } try { - return entityClient.deleteWithResponseAsync(queueName, getTracingContext(context)) + return entityClient.deleteWithResponseAsync(queueName, getContext(context)) .onErrorMap(ServiceBusAdministrationAsyncClient::mapException) .map(response -> new SimpleResponse<>(response.getRequest(), response.getStatusCode(), response.getHeaders(), null)); @@ -1639,7 +1637,7 @@ Mono> deleteRuleWithResponse(String topicName, String subscriptio } try { - return rulesClient.deleteWithResponseAsync(topicName, subscriptionName, ruleName, getTracingContext(context)) + return rulesClient.deleteWithResponseAsync(topicName, subscriptionName, ruleName, getContext(context)) .onErrorMap(ServiceBusAdministrationAsyncClient::mapException) .map(response -> new SimpleResponse<>(response.getRequest(), response.getStatusCode(), response.getHeaders(), null)); @@ -1669,7 +1667,7 @@ Mono> deleteSubscriptionWithResponse(String topicName, String sub try { return managementClient.getSubscriptions().deleteWithResponseAsync(topicName, subscriptionName, - getTracingContext(context)) + getContext(context)) .onErrorMap(ServiceBusAdministrationAsyncClient::mapException) .map(response -> new SimpleResponse<>(response.getRequest(), response.getStatusCode(), response.getHeaders(), null)); @@ -1691,7 +1689,7 @@ Mono> deleteTopicWithResponse(String topicName, Context context) return monoError(LOGGER, new IllegalArgumentException("'topicName' cannot be null or empty.")); } try { - return entityClient.deleteWithResponseAsync(topicName, getTracingContext(context)) + return entityClient.deleteWithResponseAsync(topicName, getContext(context)) .onErrorMap(ServiceBusAdministrationAsyncClient::mapException) .map(response -> new SimpleResponse<>(response.getRequest(), response.getStatusCode(), response.getHeaders(), null)); @@ -1739,7 +1737,7 @@ Mono> getQueueWithResponse(String queueName, Context context, return monoError(LOGGER, new IllegalArgumentException("'queueName' cannot be null or empty.")); } try { - return entityClient.getWithResponseAsync(queueName, true, getTracingContext(context)) + return entityClient.getWithResponseAsync(queueName, true, getContext(context)) .onErrorMap(ServiceBusAdministrationAsyncClient::mapException) .handle((response, sink) -> { final Response deserialize = deserializeQueue(response); @@ -1764,7 +1762,7 @@ Mono> getRuleWithResponse(String topicName, String subs String ruleName, Context context) { try { - return rulesClient.getWithResponseAsync(topicName, subscriptionName, ruleName, true, getTracingContext(context)) + return rulesClient.getWithResponseAsync(topicName, subscriptionName, ruleName, true, getContext(context)) .onErrorMap(ServiceBusAdministrationAsyncClient::mapException) .map(this::deserializeRule); } catch (RuntimeException ex) { @@ -1793,7 +1791,7 @@ Mono> getSubscriptionWithResponse(String topicName, String subsc try { return managementClient.getSubscriptions().getWithResponseAsync(topicName, subscriptionName, true, - getTracingContext(context)) + getContext(context)) .onErrorMap(ServiceBusAdministrationAsyncClient::mapException) .handle((response, sink) -> { final Response deserialize = deserializeSubscription(topicName, response); @@ -1854,7 +1852,7 @@ Mono> getTopicWithResponse(String topicName, Context context, } try { - return entityClient.getWithResponseAsync(topicName, true, getTracingContext(context)) + return entityClient.getWithResponseAsync(topicName, true, getContext(context)) .onErrorMap(ServiceBusAdministrationAsyncClient::mapException) .handle((response, sink) -> { final Response deserialize = deserializeTopic(response); @@ -1884,7 +1882,7 @@ Mono> getTopicWithResponse(String topicName, Context context, Mono> listQueuesFirstPage(Context context) { try { - return listQueues(0, getTracingContext(context)); + return listQueues(0, getContext(context)); } catch (RuntimeException e) { return monoError(LOGGER, e); } @@ -1906,7 +1904,7 @@ Mono> listQueuesNextPage(String continuationToken try { final int skip = Integer.parseInt(continuationToken); - return listQueues(skip, getTracingContext(context)); + return listQueues(skip, getContext(context)); } catch (RuntimeException e) { return monoError(LOGGER, e); } @@ -1921,7 +1919,7 @@ Mono> listQueuesNextPage(String continuationToken */ Mono> listRulesFirstPage(String topicName, String subscriptionName, Context context) { try { - return listRules(topicName, subscriptionName, 0, getTracingContext(context)); + return listRules(topicName, subscriptionName, 0, getContext(context)); } catch (RuntimeException e) { return monoError(LOGGER, e); } @@ -1944,7 +1942,7 @@ Mono> listRulesNextPage(String topicName, String s try { final int skip = Integer.parseInt(continuationToken); - return listRules(topicName, subscriptionName, skip, getTracingContext(context)); + return listRules(topicName, subscriptionName, skip, getContext(context)); } catch (RuntimeException e) { return monoError(LOGGER, e); } @@ -1959,7 +1957,7 @@ Mono> listRulesNextPage(String topicName, String s */ Mono> listSubscriptionsFirstPage(String topicName, Context context) { try { - return listSubscriptions(topicName, 0, getTracingContext(context)); + return listSubscriptions(topicName, 0, getContext(context)); } catch (RuntimeException e) { return monoError(LOGGER, e); } @@ -1982,7 +1980,7 @@ Mono> listSubscriptionsNextPage(String top try { final int skip = Integer.parseInt(continuationToken); - return listSubscriptions(topicName, skip, getTracingContext(context)); + return listSubscriptions(topicName, skip, getContext(context)); } catch (RuntimeException e) { return monoError(LOGGER, e); } @@ -1998,7 +1996,7 @@ Mono> listSubscriptionsNextPage(String top Mono> listTopicsFirstPage(Context context) { try { - return listTopics(0, getTracingContext(context)); + return listTopics(0, getContext(context)); } catch (RuntimeException e) { return monoError(LOGGER, e); } @@ -2020,7 +2018,7 @@ Mono> listTopicsNextPage(String continuationToken try { final int skip = Integer.parseInt(continuationToken); - return listTopics(skip, getTracingContext(context)); + return listTopics(skip, getContext(context)); } catch (RuntimeException e) { return monoError(LOGGER, e); } @@ -2042,7 +2040,7 @@ Mono> updateQueueWithResponse(QueueProperties queue, C context = context == null ? Context.NONE : context; final Context contextWithHeaders - = getTracingContext(context.addData(AZURE_REQUEST_HTTP_HEADERS_KEY, new HttpHeaders())); + = getContext(context.addData(AZURE_REQUEST_HTTP_HEADERS_KEY, new HttpHeaders())); final String forwardTo = getForwardToEntity(queue.getForwardTo(), contextWithHeaders); if (forwardTo != null) { queue.setForwardTo(forwardTo); @@ -2085,7 +2083,7 @@ Mono> updateRuleWithResponse(String topicName, String s try { // If-Match == "*" to unconditionally update. This is in line with the existing client library behaviour. return managementClient.getRules().putWithResponseAsync(topicName, subscriptionName, rule.getName(), - ruleBody, "*", getTracingContext(context)) + ruleBody, "*", getContext(context)) .onErrorMap(ServiceBusAdministrationAsyncClient::mapException) .map(this::deserializeRule); } catch (RuntimeException ex) { @@ -2108,8 +2106,7 @@ Mono> updateSubscriptionWithResponse(Subscripti return monoError(LOGGER, new NullPointerException("'subscription' cannot be null")); } context = context == null ? Context.NONE : context; - final Context contextWithHeaders = context.addData(AZ_TRACING_NAMESPACE_KEY, AZ_TRACING_NAMESPACE_VALUE) - .addData(AZURE_REQUEST_HTTP_HEADERS_KEY, new HttpHeaders()); + final Context contextWithHeaders = context.addData(AZURE_REQUEST_HTTP_HEADERS_KEY, new HttpHeaders()); final String forwardTo = getForwardToEntity(subscription.getForwardTo(), contextWithHeaders); if (forwardTo != null) { subscription.setForwardTo(forwardTo); @@ -2155,7 +2152,7 @@ Mono> updateTopicWithResponse(TopicProperties topic, C try { // If-Match == "*" to unconditionally update. This is in line with the existing client library behaviour. return entityClient.putWithResponseAsync(topic.getName(), createEntity, "*", - getTracingContext(context)) + getContext(context)) .onErrorMap(ServiceBusAdministrationAsyncClient::mapException) .map(this::deserializeTopic); } catch (RuntimeException ex) { diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/administration/ServiceBusAdministrationClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/administration/ServiceBusAdministrationClient.java index dd0418f62d30e..5cf71b0179351 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/administration/ServiceBusAdministrationClient.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/administration/ServiceBusAdministrationClient.java @@ -80,7 +80,7 @@ import static com.azure.messaging.servicebus.administration.implementation.EntityHelper.getSubscriptions; import static com.azure.messaging.servicebus.administration.implementation.EntityHelper.getTopicProperties; import static com.azure.messaging.servicebus.administration.implementation.EntityHelper.getTopics; -import static com.azure.messaging.servicebus.administration.implementation.EntityHelper.getTracingContext; +import static com.azure.messaging.servicebus.administration.implementation.EntityHelper.getContext; import static com.azure.messaging.servicebus.administration.implementation.EntityHelper.getUpdateRuleBody; import static com.azure.messaging.servicebus.administration.implementation.EntityHelper.getUpdateSubscriptionBody; import static com.azure.messaging.servicebus.administration.implementation.EntityHelper.getUpdateTopicBody; @@ -411,7 +411,7 @@ public Response createSubscriptionWithResponse(String to } context = context == null ? Context.NONE : context; final Context contextWithHeaders - = enableSyncContext(getTracingContext(context).addData(AZURE_REQUEST_HTTP_HEADERS_KEY, new HttpHeaders())); + = enableSyncContext(getContext(context).addData(AZURE_REQUEST_HTTP_HEADERS_KEY, new HttpHeaders())); final String forwardTo = getForwardToEntity(subscriptionOptions.getForwardTo(), contextWithHeaders); if (forwardTo != null) { subscriptionOptions.setForwardTo(forwardTo); @@ -1809,7 +1809,7 @@ private Response deserializeTopic(Response response) { } private static Context enableSyncContext(Context context) { - return getTracingContext(context).addData(HTTP_REST_PROXY_SYNC_PROXY_ENABLE, true); + return getContext(context).addData(HTTP_REST_PROXY_SYNC_PROXY_ENABLE, true); } private T deserialize(Object object, Class clazz) { diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/administration/ServiceBusAdministrationClientBuilder.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/administration/ServiceBusAdministrationClientBuilder.java index 4388d07359a6a..3140825742c7b 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/administration/ServiceBusAdministrationClientBuilder.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/administration/ServiceBusAdministrationClientBuilder.java @@ -35,6 +35,8 @@ import com.azure.core.util.HttpClientOptions; import com.azure.core.util.builder.ClientBuilderUtil; import com.azure.core.util.logging.ClientLogger; +import com.azure.core.util.tracing.Tracer; +import com.azure.core.util.tracing.TracerProvider; import com.azure.messaging.servicebus.ServiceBusServiceVersion; import com.azure.messaging.servicebus.administration.implementation.ServiceBusManagementClientImpl; import com.azure.messaging.servicebus.administration.implementation.ServiceBusManagementClientImplBuilder; @@ -50,6 +52,8 @@ import java.util.Map; import java.util.Objects; +import static com.azure.messaging.servicebus.implementation.ServiceBusConstants.AZ_TRACING_NAMESPACE_VALUE; + /** * This class provides a fluent builder API to help aid the configuration and instantiation of {@link * ServiceBusAdministrationClient} and {@link ServiceBusAdministrationAsyncClient}. Call @@ -537,6 +541,11 @@ private HttpPipeline createPipeline() { .policies(httpPolicies.toArray(new HttpPipelinePolicy[0])) .httpClient(httpClient) .clientOptions(clientOptions) + .tracer(createTracer()) .build(); } + private Tracer createTracer() { + return TracerProvider.getDefaultProvider().createTracer(CLIENT_NAME, CLIENT_VERSION, + AZ_TRACING_NAMESPACE_VALUE, clientOptions == null ? null : clientOptions.getTracingOptions()); + } } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/administration/implementation/EntityHelper.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/administration/implementation/EntityHelper.java index 4ce28222063ef..9e49d9f4c41bb 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/administration/implementation/EntityHelper.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/administration/implementation/EntityHelper.java @@ -68,8 +68,6 @@ import java.util.stream.Collectors; import static com.azure.core.http.policy.AddHeadersFromContextPolicy.AZURE_REQUEST_HTTP_HEADERS_KEY; -import static com.azure.core.util.tracing.Tracer.AZ_TRACING_NAMESPACE_KEY; -import static com.azure.messaging.servicebus.implementation.ServiceBusConstants.AZ_TRACING_NAMESPACE_VALUE; /** * Used to access internal methods on {@link QueueProperties}. @@ -895,10 +893,9 @@ public static void validateSubscriptionName(String subscriptionName) { } } - public static Context getTracingContext(Context context) { + public static Context getContext(Context context) { context = context == null ? Context.NONE : context; - return context.addData(AZ_TRACING_NAMESPACE_KEY, AZ_TRACING_NAMESPACE_VALUE) - .addData(AZURE_REQUEST_HTTP_HEADERS_KEY, new HttpHeaders()); + return context.addData(AZURE_REQUEST_HTTP_HEADERS_KEY, new HttpHeaders()); } /** diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/instrumentation/ServiceBusReceiverInstrumentation.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/instrumentation/ServiceBusReceiverInstrumentation.java index f56f81cfd962d..f4917f6ec7965 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/instrumentation/ServiceBusReceiverInstrumentation.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/instrumentation/ServiceBusReceiverInstrumentation.java @@ -70,8 +70,8 @@ public Mono instrumentSettlement(Mono publisher, ServiceBusReceivedMes }) .contextWrite(ctx -> { startTime.set(Instant.now().toEpochMilli()); - return ctx.put(REACTOR_PARENT_TRACE_CONTEXT_KEY, tracer.startSpanWithLink(getSettlementSpanName(status), message, - messageContext, messageContext)); + return ctx.put(REACTOR_PARENT_TRACE_CONTEXT_KEY, tracer.startSpanWithLink(getSettlementSpanName(status), ServiceBusTracer.OperationName.SETTLE, + message, messageContext, messageContext)); }); } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/instrumentation/ServiceBusTracer.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/instrumentation/ServiceBusTracer.java index bd8a387964d41..7d6348bb80c0f 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/instrumentation/ServiceBusTracer.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/instrumentation/ServiceBusTracer.java @@ -7,46 +7,43 @@ import com.azure.core.util.Configuration; import com.azure.core.util.Context; import com.azure.core.util.logging.ClientLogger; -import com.azure.core.util.tracing.ProcessKind; +import com.azure.core.util.tracing.SpanKind; +import com.azure.core.util.tracing.StartSpanOptions; import com.azure.core.util.tracing.Tracer; +import com.azure.core.util.tracing.TracingLink; import com.azure.messaging.servicebus.ServiceBusMessage; import com.azure.messaging.servicebus.ServiceBusReceivedMessage; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.core.publisher.Signal; import java.time.Instant; import java.time.OffsetDateTime; -import java.time.ZoneOffset; -import java.util.Iterator; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; -import java.util.ServiceLoader; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; -import static com.azure.core.util.tracing.Tracer.AZ_TRACING_NAMESPACE_KEY; import static com.azure.core.util.tracing.Tracer.DIAGNOSTIC_ID_KEY; import static com.azure.core.util.tracing.Tracer.ENTITY_PATH_KEY; import static com.azure.core.util.tracing.Tracer.HOST_NAME_KEY; -import static com.azure.core.util.tracing.Tracer.MESSAGE_ENQUEUED_TIME; import static com.azure.core.util.tracing.Tracer.SPAN_CONTEXT_KEY; -import static com.azure.messaging.servicebus.implementation.ServiceBusConstants.AZ_TRACING_NAMESPACE_VALUE; /** * Tracing helper. */ public class ServiceBusTracer { - public static final String START_TIME_KEY = "span-start-time"; public static final String REACTOR_PARENT_TRACE_CONTEXT_KEY = "otel-context-key"; private static final AutoCloseable NOOP_CLOSEABLE = () -> { }; private static final ClientLogger LOGGER = new ClientLogger(ServiceBusTracer.class); - protected static final String TRACEPARENT_KEY = "traceparent"; + private static final String TRACEPARENT_KEY = "traceparent"; + private static final String MESSAGING_SYSTEM_ATTRIBUTE_NAME = "messaging.system"; + public static final String MESSAGE_ENQUEUED_TIME_ATTRIBUTE_NAME = "messaging.servicebus.message.enqueued_time"; + public static final String MESSAGE_BATCH_SIZE_ATTRIBUTE_NAME = "messaging.batch.message_count"; + private static final String MESSAGING_OPERATION_ATTRIBUTE_NAME = "messaging.operation"; protected static final boolean IS_TRACING_DISABLED = Configuration.getGlobalConfiguration().get(Configuration.PROPERTY_AZURE_TRACING_DISABLED, false); protected final Tracer tracer; protected final String fullyQualifiedName; @@ -58,37 +55,34 @@ public ServiceBusTracer(Tracer tracer, String fullyQualifiedName, String entityP this.entityPath = Objects.requireNonNull(entityPath, "'entityPath' cannot be null"); } - /** - * Gets default tracer from SPI. - */ - public static Tracer getDefaultTracer() { - Iterable tracers = ServiceLoader.load(Tracer.class); - Iterator it = tracers.iterator(); - return it.hasNext() ? it.next() : null; - } - /** * Checks if tracing is enabled. */ public boolean isEnabled() { - return tracer != null; + return tracer != null && tracer.isEnabled(); } /** * Makes span in provided context (if any) current. Caller is responsible to close the returned scope. */ public AutoCloseable makeSpanCurrent(Context span) { - return tracer == null ? NOOP_CLOSEABLE : tracer.makeSpanCurrent(span); + return isEnabled() ? tracer.makeSpanCurrent(span) : NOOP_CLOSEABLE; } /** * Traces arbitrary mono. No special send or receive semantics is applied. */ public Mono traceMono(String spanName, Mono publisher) { - if (tracer != null) { + if (isEnabled()) { return publisher - .doOnEach(this::endSpan) - .contextWrite(ctx -> ctx.put(REACTOR_PARENT_TRACE_CONTEXT_KEY, tracer.start(spanName, setAttributes(Context.NONE), ProcessKind.SEND))); + .doOnEach(signal -> { + if (signal.isOnComplete() || signal.isOnError()) { + Context span = signal.getContextView().getOrDefault(REACTOR_PARENT_TRACE_CONTEXT_KEY, Context.NONE); + endSpan(signal.getThrowable(), span, null); + } + }) + .contextWrite(reactor.util.context.Context.of(REACTOR_PARENT_TRACE_CONTEXT_KEY, + tracer.start(spanName, createStartOption(SpanKind.CLIENT, null), Context.NONE))); } return publisher; @@ -98,10 +92,15 @@ public Mono traceMono(String spanName, Mono publisher) { * Traces arbitrary mono that operates with received message as input, e.g. renewLock. No special send or receive semantics is applied. */ public Mono traceMonoWithLink(String spanName, Mono publisher, ServiceBusReceivedMessage message, Context messageContext) { - if (tracer != null) { + if (isEnabled()) { return publisher - .doOnEach(this::endSpan) - .contextWrite(ctx -> ctx.put(REACTOR_PARENT_TRACE_CONTEXT_KEY, startSpanWithLink(spanName, message, messageContext, Context.NONE))); + .doOnEach(signal -> { + if (signal.isOnComplete() || signal.isOnError()) { + Context span = signal.getContextView().getOrDefault(REACTOR_PARENT_TRACE_CONTEXT_KEY, Context.NONE); + endSpan(signal.getThrowable(), span, null); + } + }) + .contextWrite(ctx -> ctx.put(REACTOR_PARENT_TRACE_CONTEXT_KEY, startSpanWithLink(spanName, null, message, messageContext, Context.NONE))); } return publisher; @@ -110,25 +109,23 @@ public Mono traceMonoWithLink(String spanName, Mono publisher, Service /** * Traces arbitrary mono that operates with sent message as input, e.g. schedule. No special send or receive semantics is applied. */ - public Mono traceMonoWithLink(String spanName, Mono publisher, ServiceBusMessage message, Context messageContext) { - if (tracer != null) { - return publisher - .doOnEach(this::endSpan) - .contextWrite(reactor.util.context.Context.of(REACTOR_PARENT_TRACE_CONTEXT_KEY, - startSpanWithLink(spanName, message, messageContext, Context.NONE))); - } - - return publisher; + public Mono traceScheduleMono(String spanName, Mono publisher, ServiceBusMessage message, Context messageContext) { + return traceMonoWithLink(spanName, OperationName.PUBLISH, publisher, message, messageContext); } /** * Traces arbitrary mono that operates with batch of sent message as input, e.g. schedule. No special send or receive semantics is applied. */ - public Flux traceFluxWithLinks(String spanName, Flux publisher, List batch, Function getContext) { - if (tracer != null) { + public Flux traceScheduleFlux(String spanName, Flux publisher, List batch, Function getContext) { + if (isEnabled()) { return publisher - .doOnEach(this::endSpan) - .contextWrite(reactor.util.context.Context.of(REACTOR_PARENT_TRACE_CONTEXT_KEY, startSpanWithLinks(spanName, batch, getContext, Context.NONE))); + .doOnEach(signal -> { + if (signal.isOnComplete() || signal.isOnError()) { + Context span = signal.getContextView().getOrDefault(REACTOR_PARENT_TRACE_CONTEXT_KEY, Context.NONE); + endSpan(signal.getThrowable(), span, null); + } + }) + .contextWrite(reactor.util.context.Context.of(REACTOR_PARENT_TRACE_CONTEXT_KEY, startSpanWithLinks(spanName, OperationName.PUBLISH, batch, getContext, Context.NONE))); } return publisher; } @@ -137,8 +134,8 @@ public Flux traceFluxWithLinks(String spanName, Flux publisher, List applicationProperties = serviceBusMessage.getApplicationProperties(); + String traceparent = getTraceparent(applicationProperties); if (traceparent != null) { // if message has context (in case of retries) or if user supplied it, don't start a message span or add a new context return; } // Starting the span makes the sampling decision (nothing is logged at this time) - Context newMessageContext = setAttributes(messageContext); - - Context eventSpanContext = tracer.start("ServiceBus.message", newMessageContext, ProcessKind.MESSAGE); - Optional traceparentOpt = eventSpanContext.getData(DIAGNOSTIC_ID_KEY); + StartSpanOptions startOptions = createStartOption(SpanKind.PRODUCER, null); - if (traceparentOpt.isPresent()) { - serviceBusMessage.getApplicationProperties().put(DIAGNOSTIC_ID_KEY, traceparentOpt.get().toString()); - serviceBusMessage.getApplicationProperties().put(TRACEPARENT_KEY, traceparentOpt.get().toString()); + Context eventSpanContext = tracer.start("ServiceBus.message", startOptions, messageContext); + tracer.injectContext((key, value) -> { + applicationProperties.put(key, value); + if (TRACEPARENT_KEY.equals(key)) { + applicationProperties.put(DIAGNOSTIC_ID_KEY, value); + } + }, eventSpanContext); - endSpan(null, eventSpanContext, null); + tracer.end(null, null, eventSpanContext); - Optional spanContext = eventSpanContext.getData(SPAN_CONTEXT_KEY); - if (spanContext.isPresent()) { - serviceBusMessage.addContext(SPAN_CONTEXT_KEY, spanContext.get()); - } + Optional spanContext = eventSpanContext.getData(SPAN_CONTEXT_KEY); + if (spanContext.isPresent()) { + serviceBusMessage.addContext(SPAN_CONTEXT_KEY, spanContext.get()); } } @@ -195,25 +193,22 @@ public void reportMessageSpan(ServiceBusMessage serviceBusMessage, Context messa */ public Mono traceManagementReceive(String spanName, Mono publisher, Function getMessageContext) { - if (tracer != null) { - AtomicLong startTime = new AtomicLong(); - AtomicReference message = new AtomicReference<>(); + if (isEnabled()) { + final StartSpanOptions startOptions = createStartOption(SpanKind.CLIENT, OperationName.RECEIVE); return publisher.doOnEach(signal -> { if (signal.hasValue()) { - message.set(signal.get()); + ServiceBusReceivedMessage message = signal.get(); + if (message != null) { + startOptions.addLink(createLink(message.getApplicationProperties(), message.getEnqueuedTime(), getMessageContext.apply(message))); + } } if (signal.isOnComplete() || signal.isOnError()) { - ServiceBusReceivedMessage msg = message.get(); - Context messageContext = msg == null ? null : getMessageContext.apply(msg); - - Context span = startSpanWithLink(spanName, msg, messageContext, new Context(START_TIME_KEY, startTime.get())); - endSpan(null, span, null); + Context span = tracer.start(spanName, startOptions, Context.NONE); + tracer.end(null, signal.getThrowable(), span); } }) - .doOnSubscribe(s -> { - startTime.set(Instant.now().toEpochMilli()); - }); + .doOnSubscribe(s -> startOptions.setStartTimestamp(Instant.now())); } return publisher; } @@ -228,49 +223,46 @@ public Mono traceManagementReceive(String spanName, M * Creates a single span with links to each message being received. */ public Flux traceSyncReceive(String spanName, Flux messages) { - if (tracer != null) { + if (isEnabled()) { + final StartSpanOptions startOptions = createStartOption(SpanKind.CLIENT, OperationName.RECEIVE); return messages .doOnEach(signal -> { - Context builder = signal.getContextView().getOrDefault(REACTOR_PARENT_TRACE_CONTEXT_KEY, Context.NONE); if (signal.hasValue()) { ServiceBusReceivedMessage message = signal.get(); if (message != null) { - addLink(message.getApplicationProperties(), message.getEnqueuedTime(), builder, Context.NONE); + startOptions.addLink(createLink(message.getApplicationProperties(), message.getEnqueuedTime(), Context.NONE)); } } else if (signal.isOnComplete() || signal.isOnError()) { - Context span = tracer.start(spanName, builder, ProcessKind.SEND); - endSpan(signal.getThrowable(), span, null); + int batchSize = startOptions.getLinks() == null ? 0 : startOptions.getLinks().size(); + startOptions.setAttribute(MESSAGE_BATCH_SIZE_ATTRIBUTE_NAME, batchSize); + Context span = tracer.start(spanName, startOptions, Context.NONE); + tracer.end(null, signal.getThrowable(), span); } }) - .contextWrite(reactor.util.context.Context.of(REACTOR_PARENT_TRACE_CONTEXT_KEY, - getBuilder(spanName, new Context(START_TIME_KEY, Instant.now().toEpochMilli())))); + .doOnSubscribe((ignored) -> startOptions.setStartTimestamp(Instant.now())); } return messages; } - public Context startSpanWithLinks(String spanName, List batch, Function getMessageContext, Context parent) { - if (tracer != null) { - Context spanBuilder = getBuilder(spanName, parent); + public Context startSpanWithLinks(String spanName, OperationName operationName, List batch, Function getMessageContext, Context parent) { + if (isEnabled() && batch != null) { + StartSpanOptions startOptions = createStartOption(SpanKind.CLIENT, operationName); + startOptions.setAttribute(MESSAGE_BATCH_SIZE_ATTRIBUTE_NAME, batch.size()); for (ServiceBusMessage message : batch) { - createMessageSpanAndAddLink(message, spanBuilder, getMessageContext.apply(message)); + startOptions.addLink(createLink(message.getApplicationProperties(), null, getMessageContext.apply(message))); } - return tracer.start(spanName, spanBuilder, ProcessKind.SEND); + return tracer.start(spanName, startOptions, parent); } return parent; } - Context startSpanWithLink(String spanName, ServiceBusReceivedMessage message, Context messageContext, Context parent) { - if (tracer != null) { - Context spanBuilder = getBuilder(spanName, parent); - if (message != null) { - addLink(message.getApplicationProperties(), message.getEnqueuedTime(), spanBuilder, messageContext); - } - - // TODO: need to refactor tracing in core. Currently we use ProcessKind.SEND as - // SpanKind.CLIENT - return tracer.start(spanName, spanBuilder, ProcessKind.SEND); + Context startSpanWithLink(String spanName, OperationName operationName, ServiceBusReceivedMessage message, Context messageContext, Context parent) { + if (isEnabled() && message != null) { + StartSpanOptions startOptions = createStartOption(SpanKind.CLIENT, operationName); + startOptions.addLink(createLink(message.getApplicationProperties(), message.getEnqueuedTime(), messageContext)); + return tracer.start(spanName, startOptions, parent); } return parent; @@ -280,77 +272,51 @@ Context startSpanWithLink(String spanName, ServiceBusReceivedMessage message, Co * Starts span. Used by ServiceBus*Instrumentations. */ Context startProcessSpan(String spanName, ServiceBusReceivedMessage message, Context parent) { - if (tracer != null) { - return tracer.start(spanName, setParentAndAttributes(message, parent), ProcessKind.PROCESS); - } + if (isEnabled() && message != null) { + StartSpanOptions startOptions = createStartOption(SpanKind.CONSUMER, OperationName.PROCESS) + .setRemoteParent(extractContext(message.getApplicationProperties())); - return parent; - } - - private Context startSpanWithLink(String name, ServiceBusMessage message, Context messageContext, Context parent) { - if (tracer != null) { - Context spanBuilder = getBuilder(name, parent); - if (message != null) { - createMessageSpanAndAddLink(message, spanBuilder, messageContext); - } + startOptions.setAttribute(MESSAGE_ENQUEUED_TIME_ATTRIBUTE_NAME, message.getEnqueuedTime()); - return tracer.start(name, spanBuilder, ProcessKind.SEND); + return tracer.start(spanName, startOptions, parent); } return parent; } - private void createMessageSpanAndAddLink(ServiceBusMessage message, Context spanBuilder, Context messageContext) { - if (tracer != null) { - String traceparent = getTraceparent(message.getApplicationProperties()); - - if (traceparent == null) { - reportMessageSpan(message, messageContext); - } - - addLink(message.getApplicationProperties(), null, spanBuilder, messageContext); - } - } - - private void addLink(Map applicationProperties, OffsetDateTime enqueuedTime, Context spanBuilder, Context messageContext) { - if (tracer != null) { - Optional linkContext = messageContext == null ? Optional.empty() : messageContext.getData(SPAN_CONTEXT_KEY); - if (!linkContext.isPresent()) { - String traceparent = getTraceparent(applicationProperties); - Context link = traceparent == null ? Context.NONE : tracer.extractContext(traceparent, Context.NONE); - linkContext = link.getData(SPAN_CONTEXT_KEY); - } - - if (enqueuedTime != null) { - spanBuilder = spanBuilder.addData(MESSAGE_ENQUEUED_TIME, enqueuedTime.toInstant().atOffset(ZoneOffset.UTC).toEpochSecond()); - } - - if (linkContext.isPresent()) { - tracer.addLink(spanBuilder.addData(SPAN_CONTEXT_KEY, linkContext.get())); - } + private TracingLink createLink(Map applicationProperties, OffsetDateTime enqueuedTime, Context eventContext) { + Context link; + Optional linkContext = eventContext.getData(SPAN_CONTEXT_KEY); + if (linkContext.isPresent()) { + link = linkContext.get() instanceof Context ? (Context) linkContext.get() : Context.NONE; + } else { + link = extractContext(applicationProperties); } - } - private Context setParentAndAttributes(ServiceBusReceivedMessage message, Context parent) { - if (message.getEnqueuedTime() != null) { - parent = parent.addData(MESSAGE_ENQUEUED_TIME, message.getEnqueuedTime().toInstant().atOffset(ZoneOffset.UTC).toEpochSecond()); + Map linkAttributes = null; + if (enqueuedTime != null) { + linkAttributes = Collections.singletonMap(MESSAGE_ENQUEUED_TIME_ATTRIBUTE_NAME, enqueuedTime.toEpochSecond()); } - parent = getParent(message.getApplicationProperties(), parent); - - return parent - .addData(Tracer.ENTITY_PATH_KEY, entityPath) - .addData(HOST_NAME_KEY, fullyQualifiedName) - .addData(AZ_TRACING_NAMESPACE_KEY, AZ_TRACING_NAMESPACE_VALUE); + return new TracingLink(link, linkAttributes); } - private Context getParent(Map properties, Context context) { - if (tracer == null) { - return context; + private Context extractContext(Map applicationProperties) { + if (applicationProperties == null) { + return Context.NONE; } - String traceparent = getTraceparent(properties); - return traceparent == null ? context : tracer.extractContext(traceparent, context); + return tracer.extractContext(key -> { + if (TRACEPARENT_KEY.equals(key)) { + return getTraceparent(applicationProperties); + } else { + Object value = applicationProperties.get(key); + if (value != null) { + return value.toString(); + } + } + return null; + }); } private static String getTraceparent(Map applicationProperties) { @@ -362,27 +328,57 @@ private static String getTraceparent(Map applicationProperties) return diagnosticId == null ? null : diagnosticId.toString(); } - private Context setAttributes(Context context) { - return context - .addData(ENTITY_PATH_KEY, entityPath) - .addData(HOST_NAME_KEY, fullyQualifiedName) - .addData(AZ_TRACING_NAMESPACE_KEY, AZ_TRACING_NAMESPACE_VALUE); + private StartSpanOptions createStartOption(SpanKind kind, OperationName operationName) { + StartSpanOptions startOptions = new StartSpanOptions(kind) + .setAttribute(MESSAGING_SYSTEM_ATTRIBUTE_NAME, "servicebus") + .setAttribute(ENTITY_PATH_KEY, entityPath) + .setAttribute(HOST_NAME_KEY, fullyQualifiedName); + + if (operationName != null) { + startOptions.setAttribute(MESSAGING_OPERATION_ATTRIBUTE_NAME, operationName.toString()); + } + + return startOptions; } - private Context getBuilder(String spanName, Context context) { - if (tracer != null) { - return setAttributes(tracer.getSharedSpanBuilder(spanName, context)); + private Mono traceMonoWithLink(String spanName, OperationName operationName, Mono publisher, ServiceBusMessage message, Context messageContext) { + if (isEnabled()) { + return publisher + .doOnEach(signal -> { + if (signal.isOnComplete() || signal.isOnError()) { + Context span = signal.getContextView().getOrDefault(REACTOR_PARENT_TRACE_CONTEXT_KEY, Context.NONE); + endSpan(signal.getThrowable(), span, null); + } + }) + .contextWrite(ctx -> { + StartSpanOptions startSpanOptions = createStartOption(SpanKind.CLIENT, operationName); + if (message != null) { + reportMessageSpan(message, messageContext); + startSpanOptions.addLink(createLink(message.getApplicationProperties(), null, messageContext)); + } + + Context span = tracer.start(spanName, startSpanOptions, Context.NONE); + return ctx.put(REACTOR_PARENT_TRACE_CONTEXT_KEY, span); + }); } - return context; + return publisher; } - private void endSpan(Signal signal) { - if (tracer == null) { - return; + public enum OperationName { + PUBLISH("publish"), + RECEIVE("receive"), + SETTLE("settle"), + PROCESS("process"); + + private final String operationName; + OperationName(String operationName) { + this.operationName = operationName; } - Context span = signal.getContextView().getOrDefault(REACTOR_PARENT_TRACE_CONTEXT_KEY, Context.NONE); - endSpan(signal.getThrowable(), span, null); + @Override + public String toString() { + return operationName; + } } } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorTest.java index cd5402e178f4f..1aa3482fdb659 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorTest.java @@ -5,8 +5,10 @@ import com.azure.core.util.BinaryData; import com.azure.core.util.Context; -import com.azure.core.util.tracing.ProcessKind; +import com.azure.core.util.tracing.SpanKind; +import com.azure.core.util.tracing.StartSpanOptions; import com.azure.core.util.tracing.Tracer; +import com.azure.core.util.tracing.TracingLink; import com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusReceiverInstrumentation; import com.azure.messaging.servicebus.implementation.ServiceBusProcessorClientOptions; import org.junit.jupiter.api.Assertions; @@ -24,12 +26,16 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; import static com.azure.core.util.tracing.Tracer.DIAGNOSTIC_ID_KEY; -import static com.azure.core.util.tracing.Tracer.MESSAGE_ENQUEUED_TIME; +import static com.azure.core.util.tracing.Tracer.ENTITY_PATH_KEY; +import static com.azure.core.util.tracing.Tracer.HOST_NAME_KEY; import static com.azure.core.util.tracing.Tracer.PARENT_TRACE_CONTEXT_KEY; import static com.azure.core.util.tracing.Tracer.SPAN_CONTEXT_KEY; +import static com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusTracer.MESSAGE_ENQUEUED_TIME_ATTRIBUTE_NAME; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; @@ -370,23 +376,25 @@ public void testUserMessageHandlerErrorWithAutoCompleteDisabled() throws Interru } @Test + @SuppressWarnings("unchecked") public void testProcessorWithTracingEnabled() throws InterruptedException { final Tracer tracer = mock(Tracer.class); final int numberOfTimes = 5; String diagnosticId = "00-08ee063508037b1719dddcbf248e30e2-1365c684eb25daed-01"; - - when(tracer.extractContext(eq(diagnosticId), any())).thenAnswer( - invocation -> { - Context passed = invocation.getArgument(1, Context.class); - return passed.addData(SPAN_CONTEXT_KEY, "value"); - } - ); - when(tracer.start(eq("ServiceBus.process"), any(), eq(ProcessKind.PROCESS))).thenAnswer( + when(tracer.isEnabled()).thenReturn(true); + when(tracer.extractContext(any())).thenAnswer(invocation -> { + Function consumer = invocation.getArgument(0, Function.class); + assertEquals(diagnosticId, consumer.apply("traceparent")); + assertNull(consumer.apply("tracestate")); + return new Context(SPAN_CONTEXT_KEY, "value"); + }); + + when(tracer.start(eq("ServiceBus.process"), any(StartSpanOptions.class), any())).thenAnswer( invocation -> { - Context passed = invocation.getArgument(1, Context.class); - assertTrue(passed.getData(MESSAGE_ENQUEUED_TIME).isPresent()); - return passed.addData(SPAN_CONTEXT_KEY, "value1") + assertStartOptions(invocation.getArgument(1, StartSpanOptions.class), 0); + Context passed = invocation.getArgument(2, Context.class); + return passed .addData(PARENT_TRACE_CONTEXT_KEY, "value2"); } ); @@ -422,27 +430,57 @@ public void testProcessorWithTracingEnabled() throws InterruptedException { serviceBusProcessorClient.close(); assertTrue(success, "Failed to receive all expected messages"); - verify(tracer, times(numberOfTimes)).extractContext(eq(diagnosticId), any()); - verify(tracer, times(numberOfTimes)).start(eq("ServiceBus.process"), any(), eq(ProcessKind.PROCESS)); + verify(tracer, times(numberOfTimes)).extractContext(any()); + verify(tracer, times(numberOfTimes)).start(eq("ServiceBus.process"), any(StartSpanOptions.class), any(Context.class)); // This is one less because the processEvent is called before the end span call, so it is possible for // to reach this line without calling it the 5th time yet. (Timing issue.) - verify(tracer, atLeast(numberOfTimes - 1)).end(eq("success"), isNull(), any()); + verify(tracer, atLeast(numberOfTimes - 1)).end(isNull(), isNull(), any()); + } + + @Test + @SuppressWarnings("unchecked") + public void testProcessorWithTracingDisabled() throws InterruptedException { + final Tracer tracer = mock(Tracer.class); + + when(tracer.isEnabled()).thenReturn(false); + + Flux messageFlux = + Flux.create(emitter -> { + ServiceBusReceivedMessage serviceBusReceivedMessage = + new ServiceBusReceivedMessage(BinaryData.fromString("hello")); + emitter.next(new ServiceBusMessageContext(serviceBusReceivedMessage)); + }); + + ServiceBusClientBuilder.ServiceBusReceiverClientBuilder receiverBuilder = getBuilder(messageFlux, tracer); + + CountDownLatch countDownLatch = new CountDownLatch(1); + ServiceBusProcessorClient serviceBusProcessorClient = new ServiceBusProcessorClient(receiverBuilder, ENTITY_NAME, + null, null, + messageContext -> countDownLatch.countDown(), + error -> Assertions.fail("Error occurred when receiving messages from the processor"), + new ServiceBusProcessorClientOptions().setMaxConcurrentCalls(1)); + serviceBusProcessorClient.start(); + boolean success = countDownLatch.await(1, TimeUnit.SECONDS); + serviceBusProcessorClient.close(); + + assertTrue(success, "Failed to receive message"); + verify(tracer, never()).extractContext(any()); + verify(tracer, never()).start(eq("ServiceBus.process"), any(StartSpanOptions.class), any(Context.class)); + verify(tracer, never()).end(any(), any(), any()); } @Test public void testProcessorWithTracingEnabledWithoutDiagnosticId() throws InterruptedException { final Tracer tracer = mock(Tracer.class); final int numberOfTimes = 5; - - when(tracer.start(eq("ServiceBus.process"), any(), eq(ProcessKind.PROCESS))).thenAnswer( + when(tracer.isEnabled()).thenReturn(true); + when(tracer.start(eq("ServiceBus.process"), any(StartSpanOptions.class), any())).thenAnswer( invocation -> { - Context passed = invocation.getArgument(1, Context.class); - assertTrue(passed.getData(MESSAGE_ENQUEUED_TIME).isPresent()); + assertStartOptions(invocation.getArgument(1, StartSpanOptions.class), 0); + Context passed = invocation.getArgument(2, Context.class); return passed - .addData(SPAN_CONTEXT_KEY, "value1") - .addData("scope", (AutoCloseable) () -> { }) .addData(PARENT_TRACE_CONTEXT_KEY, "value2"); } ); @@ -477,11 +515,11 @@ public void testProcessorWithTracingEnabledWithoutDiagnosticId() throws Interrup serviceBusProcessorClient.close(); assertTrue(success, "Failed to receive all expected messages"); - verify(tracer, times(numberOfTimes)).start(eq("ServiceBus.process"), any(), eq(ProcessKind.PROCESS)); + verify(tracer, times(numberOfTimes)).start(eq("ServiceBus.process"), any(StartSpanOptions.class), any(Context.class)); // This is one less because the processEvent is called before the end span call, so it is possible for // to reach this line without calling it the 5th time yet. (Timing issue.) - verify(tracer, atLeast(numberOfTimes - 1)).end(eq("success"), isNull(), any()); + verify(tracer, atLeast(numberOfTimes - 1)).end(isNull(), isNull(), any()); } @@ -521,4 +559,20 @@ private ServiceBusClientBuilder.ServiceBusSessionReceiverClientBuilder getSessio doNothing().when(asyncClient).close(); return receiverBuilder; } + + private void assertStartOptions(StartSpanOptions startOpts, int linkCount) { + assertEquals(SpanKind.CONSUMER, startOpts.getSpanKind()); + assertEquals(ENTITY_NAME, startOpts.getAttributes().get(ENTITY_PATH_KEY)); + assertEquals(NAMESPACE, startOpts.getAttributes().get(HOST_NAME_KEY)); + + if (linkCount == 0) { + assertTrue(startOpts.getAttributes().containsKey(MESSAGE_ENQUEUED_TIME_ATTRIBUTE_NAME)); + assertNull(startOpts.getLinks()); + } else { + assertEquals(linkCount, startOpts.getLinks().size()); + for (TracingLink link : startOpts.getLinks()) { + assertTrue(link.getAttributes().containsKey(MESSAGE_ENQUEUED_TIME_ATTRIBUTE_NAME)); + } + } + } } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientTest.java index 6d6bbdd775cf1..df40859107311 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientTest.java @@ -22,7 +22,7 @@ import com.azure.core.util.ClientOptions; import com.azure.core.util.Context; import com.azure.core.util.logging.ClientLogger; -import com.azure.core.util.tracing.ProcessKind; +import com.azure.core.util.tracing.StartSpanOptions; import com.azure.core.util.tracing.Tracer; import com.azure.messaging.servicebus.ServiceBusClientBuilder.ServiceBusReceiverClientBuilder; import com.azure.messaging.servicebus.implementation.DispositionStatus; @@ -1456,6 +1456,7 @@ void receiveWithTracesAndMetrics() { final List messages = getMessages(); TestMeter meter = new TestMeter(); Tracer tracer = mock(Tracer.class); + when(tracer.isEnabled()).thenReturn(true); ServiceBusReceiverInstrumentation instrumentation = new ServiceBusReceiverInstrumentation(tracer, meter, NAMESPACE, ENTITY_PATH, SUBSCRIPTION_NAME, false); receiver = new ServiceBusReceiverAsyncClient(NAMESPACE, ENTITY_PATH, MessagingEntityType.QUEUE, @@ -1466,9 +1467,8 @@ void receiveWithTracesAndMetrics() { Context spanReceive1 = new Context("marker1", true); Context spanReceive2 = new Context("marker2", true); Context spanSettle = new Context("marker3", true); - when(tracer.start(eq("ServiceBus.process"), any(Context.class), eq(ProcessKind.PROCESS))).thenReturn(spanReceive1, spanReceive2); - when(tracer.getSharedSpanBuilder(any(), any(Context.class))).thenReturn(Context.NONE); - when(tracer.start(any(), any(Context.class), eq(ProcessKind.SEND))).thenReturn(spanSettle); + when(tracer.start(eq("ServiceBus.process"), any(StartSpanOptions.class), any(Context.class))).thenReturn(spanReceive1, spanReceive2); + when(tracer.start(eq("ServiceBus.complete"), any(StartSpanOptions.class), any(Context.class))).thenReturn(spanSettle); when(receivedMessage.getLockToken()).thenReturn("mylockToken"); when(receivedMessage.getSequenceNumber()).thenReturn(42L); diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientTest.java index d8255e0ab979a..14e0e3c29777f 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientTest.java @@ -21,7 +21,8 @@ import com.azure.core.util.BinaryData; import com.azure.core.util.ClientOptions; import com.azure.core.util.Context; -import com.azure.core.util.tracing.ProcessKind; +import com.azure.core.util.tracing.SpanKind; +import com.azure.core.util.tracing.StartSpanOptions; import com.azure.core.util.tracing.Tracer; import com.azure.messaging.servicebus.implementation.MessagingEntityType; import com.azure.messaging.servicebus.implementation.ServiceBusAmqpConnection; @@ -61,19 +62,19 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiConsumer; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.IntStream; -import static com.azure.core.util.tracing.Tracer.AZ_TRACING_NAMESPACE_KEY; -import static com.azure.core.util.tracing.Tracer.DIAGNOSTIC_ID_KEY; +import static com.azure.core.util.tracing.Tracer.ENTITY_PATH_KEY; +import static com.azure.core.util.tracing.Tracer.HOST_NAME_KEY; import static com.azure.core.util.tracing.Tracer.PARENT_TRACE_CONTEXT_KEY; -import static com.azure.core.util.tracing.Tracer.SPAN_BUILDER_KEY; import static com.azure.core.util.tracing.Tracer.SPAN_CONTEXT_KEY; import static com.azure.messaging.servicebus.ServiceBusSenderAsyncClient.MAX_MESSAGE_LENGTH_BYTES; -import static com.azure.messaging.servicebus.implementation.ServiceBusConstants.AZ_TRACING_NAMESPACE_VALUE; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyList; @@ -81,6 +82,7 @@ import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isNull; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -391,12 +393,13 @@ void sendMultipleMessages() { } @Test + @SuppressWarnings("unchecked") void sendMultipleMessagesTracesSpans() { // Arrange final int count = 4; final byte[] contents = TEST_CONTENTS.toBytes(); final Tracer tracer1 = mock(Tracer.class); - String traceparent = "traceparent"; + when(tracer1.isEnabled()).thenReturn(true); ServiceBusSenderInstrumentation instrumentation = new ServiceBusSenderInstrumentation(tracer1, null, NAMESPACE, ENTITY_NAME); final ServiceBusMessageBatch batch = new ServiceBusMessageBatch(256 * 1024, @@ -407,32 +410,29 @@ void sendMultipleMessagesTracesSpans() { when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), eq(retryOptions), isNull(), eq(CLIENT_IDENTIFIER))) .thenReturn(Mono.just(sendLink)); when(sendLink.send(anyList())).thenReturn(Mono.empty()); - when(tracer1.start(eq("ServiceBus.send"), any(Context.class), eq(ProcessKind.SEND))) - .thenAnswer(invocation -> { - Context passed = invocation.getArgument(1, Context.class); - assertEquals(passed.getData(AZ_TRACING_NAMESPACE_KEY).get(), AZ_TRACING_NAMESPACE_VALUE); - return passed.addData(PARENT_TRACE_CONTEXT_KEY, "value"); - }); - - when(tracer1.extractContext(eq(traceparent), any(Context.class))).thenAnswer(invocation -> { - Context passed = invocation.getArgument(1, Context.class); - return passed.addData(SPAN_CONTEXT_KEY, "span-context"); - }); - when(tracer1.start(eq("ServiceBus.message"), any(Context.class), eq(ProcessKind.MESSAGE))) - .thenAnswer(invocation -> { - Context passed = invocation.getArgument(1, Context.class); - assertEquals(passed.getData(AZ_TRACING_NAMESPACE_KEY).get(), AZ_TRACING_NAMESPACE_VALUE); - return passed.addData(PARENT_TRACE_CONTEXT_KEY, "value").addData(DIAGNOSTIC_ID_KEY, traceparent); + when(tracer1.start(eq("ServiceBus.message"), any(), any(Context.class))).thenAnswer( + invocation -> { + assertStartOptions(invocation.getArgument(1, StartSpanOptions.class), SpanKind.PRODUCER, 0); + return invocation.getArgument(2, Context.class) + .addData(SPAN_CONTEXT_KEY, "span"); }); - when(tracer1.getSharedSpanBuilder(eq("ServiceBus.send"), any(Context.class))).thenAnswer( + when(tracer1.start(eq("ServiceBus.send"), any(), any(Context.class))).thenAnswer( invocation -> { - Context passed = invocation.getArgument(1, Context.class); - return passed.addData(SPAN_BUILDER_KEY, "value"); + assertStartOptions(invocation.getArgument(1, StartSpanOptions.class), SpanKind.CLIENT, count); + return invocation.getArgument(2, Context.class) + .addData(PARENT_TRACE_CONTEXT_KEY, "trace-context"); } ); + doAnswer(invocation -> { + BiConsumer injectContext = invocation.getArgument(0, BiConsumer.class); + injectContext.accept("traceparent", "diag-id"); + return null; + }).when(tracer1).injectContext(any(), any(Context.class)); + + IntStream.range(0, count).forEach(index -> { final ServiceBusMessage message = new ServiceBusMessage(BinaryData.fromBytes(contents)); Assertions.assertTrue(batch.tryAddMessage(message)); @@ -444,10 +444,10 @@ void sendMultipleMessagesTracesSpans() { // Assert verify(tracer1, times(4)) - .start(eq("ServiceBus.message"), any(Context.class), eq(ProcessKind.MESSAGE)); + .start(eq("ServiceBus.message"), any(StartSpanOptions.class), any(Context.class)); verify(tracer1, times(1)) - .start(eq("ServiceBus.send"), any(Context.class), eq(ProcessKind.SEND)); - verify(tracer1, times(5)).end(eq("success"), isNull(), any(Context.class)); + .start(eq("ServiceBus.send"), any(StartSpanOptions.class), any(Context.class)); + verify(tracer1, times(5)).end(isNull(), isNull(), any(Context.class)); } @Test @@ -491,6 +491,7 @@ void sendMessageReportsMetricsAndTraces() { // Arrange TestMeter meter = new TestMeter(); Tracer tracer = mock(Tracer.class); + when(tracer.isEnabled()).thenReturn(true); ServiceBusSenderInstrumentation instrumentation = new ServiceBusSenderInstrumentation(tracer, meter, NAMESPACE, ENTITY_NAME); sender = new ServiceBusSenderAsyncClient(ENTITY_NAME, MessagingEntityType.QUEUE, connectionProcessor, @@ -501,12 +502,11 @@ void sendMessageReportsMetricsAndTraces() { when(sendLink.send(any(Message.class))).thenReturn(Mono.empty()); Context span = new Context("marker", true); - when(tracer.start(eq("ServiceBus.send"), any(Context.class), eq(ProcessKind.SEND))) + when(tracer.start(eq("ServiceBus.send"), any(StartSpanOptions.class), any(Context.class))) .thenReturn(span); - when(tracer.extractContext(any(), any(Context.class))).thenReturn(Context.NONE); - when(tracer.start(eq("ServiceBus.message"), any(Context.class), any())).thenReturn(Context.NONE); - when(tracer.getSharedSpanBuilder(eq("ServiceBus.send"), any(Context.class))).thenReturn(Context.NONE); + when(tracer.extractContext(any())).thenReturn(Context.NONE); + when(tracer.start(eq("ServiceBus.message"), any(StartSpanOptions.class), any(Context.class))).thenReturn(Context.NONE); // Act StepVerifier.create(sender.sendMessage(new ServiceBusMessage(TEST_CONTENTS))) @@ -906,4 +906,16 @@ private void assertCommonMetricAttributes(Map attributes, boolea assertEquals(ENTITY_NAME, attributes.get("entityName")); assertEquals(success ? "ok" : "error", attributes.get("status")); } + + private void assertStartOptions(StartSpanOptions startOpts, SpanKind kind, int linkCount) { + assertEquals(kind, startOpts.getSpanKind()); + assertEquals(ENTITY_NAME, startOpts.getAttributes().get(ENTITY_PATH_KEY)); + assertEquals(NAMESPACE, startOpts.getAttributes().get(HOST_NAME_KEY)); + + if (linkCount == 0) { + assertNull(startOpts.getLinks()); + } else { + assertEquals(linkCount, startOpts.getLinks().size()); + } + } } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/TracingIntegrationTests.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/TracingIntegrationTests.java index 032e639218aad..84d58d464542a 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/TracingIntegrationTests.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/TracingIntegrationTests.java @@ -4,6 +4,7 @@ package com.azure.messaging.servicebus; import com.azure.core.util.logging.ClientLogger; +import com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusTracer; import com.azure.messaging.servicebus.models.DeferOptions; import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.api.common.AttributeKey; @@ -43,6 +44,8 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; @@ -50,10 +53,11 @@ @Execution(ExecutionMode.SAME_THREAD) public class TracingIntegrationTests extends IntegrationTestBase { private TestSpanProcessor spanProcessor; + private List toClose = new ArrayList<>(); private ServiceBusSenderAsyncClient sender; - ServiceBusReceiverAsyncClient receiver; - ServiceBusReceiverClient receiverSync; - ServiceBusProcessorClient processor; + private ServiceBusReceiverAsyncClient receiver; + private ServiceBusReceiverClient receiverSync; + private ServiceBusProcessorClient processor; public TracingIntegrationTests() { super(new ClientLogger(TracingIntegrationTests.class)); @@ -87,6 +91,9 @@ protected void beforeTest() { .receiver() .queueName(getQueueName(0)) .buildClient(); + toClose.add(sender); + toClose.add(receiver); + toClose.add(receiverSync); StepVerifier.setDefaultTimeout(TIMEOUT); } @@ -95,10 +102,15 @@ protected void beforeTest() { protected void afterTest() { GlobalOpenTelemetry.resetForTest(); sharedBuilder = null; + + if (processor != null) { + toClose.add(processor); + } + try { - dispose(receiver, sender, processor, receiverSync); + dispose(toClose.toArray(new AutoCloseable[0])); } catch (Exception e) { - logger.warning("Error occurred when draining queue.", e); + logger.warning("Error disposing resources.", e); } } @@ -137,17 +149,17 @@ public void sendAndReceive() throws InterruptedException { assertMessageSpan(messageSpans.get(1), message2); List send = findSpans(spans, "ServiceBus.send"); - assertSendSpan(send.get(0), messages, "ServiceBus.send"); + assertClientProducerSpan(send.get(0), messages, "ServiceBus.send", "publish"); List processed = findSpans(spans, "ServiceBus.process"); assertConsumerSpan(processed.get(0), received.get(0), "ServiceBus.process"); assertConsumerSpan(processed.get(1), received.get(1), "ServiceBus.process"); List completed = findSpans(spans, "ServiceBus.complete"); - assertReceiveSpan(completed.get(0), Collections.singletonList(received.get(0)), "ServiceBus.complete"); + assertClientSpan(completed.get(0), Collections.singletonList(received.get(0)), "ServiceBus.complete", "settle"); assertParentFound(completed.get(0), processed); - assertReceiveSpan(completed.get(1), Collections.singletonList(received.get(1)), "ServiceBus.complete"); + assertClientSpan(completed.get(1), Collections.singletonList(received.get(1)), "ServiceBus.complete", "settle"); assertParentFound(completed.get(1), processed); } @@ -252,6 +264,7 @@ public void sendAndReceiveParallelAutoComplete() throws InterruptedException { .queueName(getQueueName(0)) .buildAsyncClient(); + toClose.add(receiverAutoComplete); StepVerifier.create( receiverAutoComplete.receiveMessages() .take(messageCount) @@ -302,7 +315,7 @@ public void sendPeekRenewLockAndDefer() throws InterruptedException { .flatMap(m -> receiver.receiveDeferredMessage(m.getSequenceNumber()).thenReturn(m)) .subscribe(m -> { if (traceparent.equals(m.getApplicationProperties().get("traceparent"))) { - receivedMessage.set(m); + receivedMessage.compareAndSet(null, m); latch.countDown(); } }); @@ -314,22 +327,22 @@ public void sendPeekRenewLockAndDefer() throws InterruptedException { assertEquals(0, findSpans(spans, "ServiceBus.message").size()); List send = findSpans(spans, "ServiceBus.send"); - assertSendSpan(send.get(0), Collections.singletonList(message), "ServiceBus.send"); + assertClientProducerSpan(send.get(0), Collections.singletonList(message), "ServiceBus.send", "publish"); List process = findSpans(spans, "ServiceBus.process", traceId); assertConsumerSpan(process.get(0), receivedMessage.get(), "ServiceBus.process"); List renewMessageLock = findSpans(spans, "ServiceBus.renewMessageLock", traceId); - assertReceiveSpan(renewMessageLock.get(0), Collections.singletonList(receivedMessage.get()), "ServiceBus.renewMessageLock"); + assertClientSpan(renewMessageLock.get(0), Collections.singletonList(receivedMessage.get()), "ServiceBus.renewMessageLock", null); assertParent(renewMessageLock.get(0), process.get(0)); // for correlation to work after first async call, we need to enable otel rector instrumentations, // so no correlation beyond this point List defer = findSpans(spans, "ServiceBus.defer"); - assertReceiveSpan(defer.get(0), Collections.singletonList(receivedMessage.get()), "ServiceBus.defer"); + assertClientSpan(defer.get(0), Collections.singletonList(receivedMessage.get()), "ServiceBus.defer", "settle"); List receiveDeferredMessage = findSpans(spans, "ServiceBus.receiveDeferredMessage"); - assertReceiveSpan(receiveDeferredMessage.get(0), Collections.singletonList(receivedMessage.get()), "ServiceBus.receiveDeferredMessage"); + assertClientSpan(receiveDeferredMessage.get(0), Collections.singletonList(receivedMessage.get()), "ServiceBus.receiveDeferredMessage", "receive"); } @Test @@ -346,16 +359,16 @@ public void sendReceiveRenewLockAndDeferSync() { List spans = spanProcessor.getEndedSpans(); List received = findSpans(spans, "ServiceBus.receiveMessages"); - assertReceiveSpan(received.get(0), Collections.singletonList(receivedMessage), "ServiceBus.receiveMessages"); + assertClientSpan(received.get(0), Collections.singletonList(receivedMessage), "ServiceBus.receiveMessages", "receive"); List renewMessageLock = findSpans(spans, "ServiceBus.renewMessageLock"); - assertReceiveSpan(renewMessageLock.get(0), Collections.singletonList(receivedMessage), "ServiceBus.renewMessageLock"); + assertClientSpan(renewMessageLock.get(0), Collections.singletonList(receivedMessage), "ServiceBus.renewMessageLock", null); List defer = findSpans(spans, "ServiceBus.defer"); - assertReceiveSpan(defer.get(0), Collections.singletonList(receivedMessage), "ServiceBus.defer"); + assertClientSpan(defer.get(0), Collections.singletonList(receivedMessage), "ServiceBus.defer", "settle"); List receiveDeferredMessage = findSpans(spans, "ServiceBus.receiveDeferredMessage"); - assertReceiveSpan(receiveDeferredMessage.get(0), Collections.singletonList(receivedMessage), "ServiceBus.receiveDeferredMessage"); + assertClientSpan(receiveDeferredMessage.get(0), Collections.singletonList(receivedMessage), "ServiceBus.receiveDeferredMessage", "receive"); } @Test @@ -374,13 +387,13 @@ public void syncReceive() { List spans = spanProcessor.getEndedSpans(); List received = findSpans(spans, "ServiceBus.receiveMessages"); - assertReceiveSpan(received.get(0), receivedMessages, "ServiceBus.receiveMessages"); + assertClientSpan(received.get(0), receivedMessages, "ServiceBus.receiveMessages", "receive"); assertEquals(0, findSpans(spans, "ServiceBus.process").size()); List completed = findSpans(spans, "ServiceBus.complete"); - assertReceiveSpan(completed.get(0), Collections.singletonList(receivedMessages.get(0)), "ServiceBus.complete"); - assertReceiveSpan(completed.get(1), Collections.singletonList(receivedMessages.get(1)), "ServiceBus.complete"); + assertClientSpan(completed.get(0), Collections.singletonList(receivedMessages.get(0)), "ServiceBus.complete", "settle"); + assertClientSpan(completed.get(1), Collections.singletonList(receivedMessages.get(1)), "ServiceBus.complete", "settle"); } @Test @@ -391,8 +404,8 @@ public void syncReceiveTimeout() { List spans = spanProcessor.getEndedSpans(); List received = findSpans(spans, "ServiceBus.receiveMessages"); - assertReceiveSpan(received.get(0), receivedMessages, "ServiceBus.receiveMessages"); - assertEquals(StatusCode.OK, received.get(0).toSpanData().getStatus().getStatusCode()); + assertClientSpan(received.get(0), receivedMessages, "ServiceBus.receiveMessages", "receive"); + assertEquals(StatusCode.UNSET, received.get(0).toSpanData().getStatus().getStatusCode()); assertEquals(0, findSpans(spans, "ServiceBus.process").size()); } @@ -404,8 +417,15 @@ public void peekMessage() { StepVerifier.create(receiver.peekMessage()) .assertNext(receivedMessage -> { - List received = findSpans(spanProcessor.getEndedSpans(), "ServiceBus.peekMessage"); - assertReceiveSpan(received.get(0), Collections.singletonList(receivedMessage), "ServiceBus.peekMessage"); + ReadableSpan received = findSpans(spanProcessor.getEndedSpans(), "ServiceBus.peekMessage").get(0); + if (receivedMessage.getApplicationProperties().containsKey("traceparent")) { + assertClientSpan(received, Collections.singletonList(receivedMessage), "ServiceBus.peekMessage", "receive"); + } else { + assertEquals("ServiceBus.peekMessage", received.getName()); + assertEquals(SpanKind.CLIENT, received.getKind()); + assertEquals(0, received.toSpanData().getLinks().size()); + assertEquals("receive", received.getAttribute(AttributeKey.stringKey("messaging.operation"))); + } }) .verifyComplete(); } @@ -416,7 +436,7 @@ public void peekNonExistingMessage() { .verifyComplete(); List received = findSpans(spanProcessor.getEndedSpans(), "ServiceBus.peekMessage"); - assertReceiveSpan(received.get(0), Collections.emptyList(), "ServiceBus.peekMessage"); + assertClientSpan(received.get(0), Collections.emptyList(), "ServiceBus.peekMessage", "receive"); } @Test @@ -431,14 +451,14 @@ public void sendAndProcess() throws InterruptedException { String message1SpanId = message.getApplicationProperties().get("traceparent").toString().substring(36, 52); CountDownLatch completedFound = new CountDownLatch(1); spanProcessor.notifyIfCondition(completedFound, span -> { - if (!span.getName().equals("ServiceBus.complete")) { + if (!span.getName().equals("ServiceBus.process")) { return false; } - List links = span.toSpanData().getLinks(); - return links.size() > 0 && links.get(0).getSpanContext().getSpanId().equals(message1SpanId); + + return span.getParentSpanContext().getSpanId().equals(message1SpanId); }); - AtomicReference currentInProcess = new AtomicReference<>(Span.getInvalid()); + AtomicReference currentInProcess = new AtomicReference<>(); AtomicReference receivedMessage = new AtomicReference<>(); processor = new ServiceBusClientBuilder() .connectionString(getConnectionString()) @@ -446,8 +466,8 @@ public void sendAndProcess() throws InterruptedException { .queueName(getQueueName(0)) .processMessage(mc -> { if (mc.getMessage().getMessageId().equals(messageId)) { - currentInProcess.set(Span.current()); - receivedMessage.set(mc.getMessage()); + currentInProcess.compareAndSet(null, Span.current()); + receivedMessage.compareAndSet(null, mc.getMessage()); } }) .processError(e -> { @@ -455,6 +475,7 @@ public void sendAndProcess() throws InterruptedException { }) .buildProcessorClient(); + toClose.add(() -> processor.stop()); processor.start(); assertTrue(completedFound.await(20, TimeUnit.SECONDS)); processor.stop(); @@ -463,11 +484,11 @@ public void sendAndProcess() throws InterruptedException { List spans = spanProcessor.getEndedSpans(); assertMessageSpan(spans.get(0), message); - assertSendSpan(spans.get(1), Collections.singletonList(message), "ServiceBus.send"); + assertClientProducerSpan(spans.get(1), Collections.singletonList(message), "ServiceBus.send", "publish"); assertEquals(0, findSpans(spans, "ServiceBus.consume").size()); List processed = findSpans(spans, "ServiceBus.process") - .stream().filter(p -> p == currentInProcess.get()).collect(Collectors.toList()); + .stream().filter(p -> p.equals(currentInProcess.get())).collect(Collectors.toList()); assertEquals(1, processed.size()); assertConsumerSpan(processed.get(0), receivedMessage.get(), "ServiceBus.process"); @@ -478,7 +499,7 @@ public void sendAndProcess() throws InterruptedException { }) .collect(Collectors.toList()); assertEquals(1, completed.size()); - assertSendSpan(completed.get(0), Collections.singletonList(message), "ServiceBus.complete"); + assertClientProducerSpan(completed.get(0), Collections.singletonList(message), "ServiceBus.complete", "settle"); assertParentFound(completed.get(0), processed); } @@ -512,7 +533,7 @@ public void sendAndProcessParallel() throws InterruptedException { }) .processError(e -> fail("unexpected error", e.getException())) .buildProcessorClient(); - + toClose.add(() -> processor.stop()); processor.start(); assertTrue(processedFound.await(10, TimeUnit.SECONDS)); processor.stop(); @@ -558,7 +579,7 @@ public void sendAndProcessParallelNoAutoComplete() throws InterruptedException { }) .processError(e -> fail("unexpected error", e.getException())) .buildProcessorClient(); - + toClose.add(() -> processor.stop()); processor.start(); assertTrue(completedFound.await(20, TimeUnit.SECONDS)); processor.stop(); @@ -587,7 +608,7 @@ public void sendProcessAndFail() throws InterruptedException { CountDownLatch messageProcessed = new CountDownLatch(1); spanProcessor.notifyIfCondition(messageProcessed, span -> - span.getName() == "ServiceBus.process" && span.getParentSpanContext().getSpanId().equals(message1SpanId)); + "ServiceBus.process".equals(span.getName()) && span.getParentSpanContext().getSpanId().equals(message1SpanId)); AtomicReference receivedMessage = new AtomicReference<>(); processor = new ServiceBusClientBuilder() @@ -596,13 +617,13 @@ public void sendProcessAndFail() throws InterruptedException { .queueName(getQueueName(0)) .processMessage(mc -> { if (mc.getMessage().getMessageId().equals(messageId)) { - receivedMessage.set(mc.getMessage()); + receivedMessage.compareAndSet(null, mc.getMessage()); throw new RuntimeException("foo"); } }) .processError(e -> { }) .buildProcessorClient(); - + toClose.add(() -> processor.stop()); processor.start(); assertTrue(messageProcessed.await(10, TimeUnit.SECONDS)); processor.stop(); @@ -619,7 +640,7 @@ public void sendProcessAndFail() throws InterruptedException { .filter(c -> c.toSpanData().getLinks().get(0).getSpanContext().getSpanId().equals(message1SpanId)) .collect(Collectors.toList()); assertEquals(1, abandoned.size()); - assertSendSpan(abandoned.get(0), Collections.singletonList(message), "ServiceBus.abandon"); + assertClientProducerSpan(abandoned.get(0), Collections.singletonList(message), "ServiceBus.abandon", "settle"); assertParentFound(abandoned.get(0), processed); } @@ -633,23 +654,29 @@ public void scheduleAndCancelMessage() { List spans = spanProcessor.getEndedSpans(); assertMessageSpan(spans.get(0), message); - assertSendSpan(spans.get(1), Collections.singletonList(message), "ServiceBus.scheduleMessage"); - assertSendSpan(spans.get(2), Collections.emptyList(), "ServiceBus.cancelScheduledMessage"); + assertClientProducerSpan(spans.get(1), Collections.singletonList(message), "ServiceBus.scheduleMessage", "publish"); + assertClientProducerSpan(spans.get(2), Collections.emptyList(), "ServiceBus.cancelScheduledMessage", null); } private void assertMessageSpan(ReadableSpan actual, ServiceBusMessage message) { assertEquals("ServiceBus.message", actual.getName()); assertEquals(SpanKind.PRODUCER, actual.getKind()); + assertNull(actual.getAttribute(AttributeKey.stringKey("messaging.operation"))); String traceparent = "00-" + actual.getSpanContext().getTraceId() + "-" + actual.getSpanContext().getSpanId() + "-01"; assertEquals(message.getApplicationProperties().get("Diagnostic-Id"), traceparent); assertEquals(message.getApplicationProperties().get("traceparent"), traceparent); } - private void assertSendSpan(ReadableSpan actual, List messages, String spanName) { + private void assertClientProducerSpan(ReadableSpan actual, List messages, String spanName, String operationName) { assertEquals(spanName, actual.getName()); assertEquals(SpanKind.CLIENT, actual.getKind()); List links = actual.toSpanData().getLinks(); assertEquals(messages.size(), links.size()); + assertEquals(operationName, actual.getAttribute(AttributeKey.stringKey("messaging.operation"))); + if (messages.size() > 1) { + assertEquals(messages.size(), actual.getAttribute(AttributeKey.longKey("messaging.batch.message_count"))); + } + for (int i = 0; i < links.size(); i++) { String messageTraceparent = (String) messages.get(i).getApplicationProperties().get("traceparent"); SpanContext linkContext = links.get(i).getSpanContext(); @@ -658,18 +685,21 @@ private void assertSendSpan(ReadableSpan actual, List message } } - private void assertReceiveSpan(ReadableSpan actual, List messages, String spanName) { + private void assertClientSpan(ReadableSpan actual, List messages, String spanName, String operationName) { assertEquals(spanName, actual.getName()); assertEquals(SpanKind.CLIENT, actual.getKind()); List links = actual.toSpanData().getLinks(); assertEquals(messages.size(), links.size()); + assertEquals(operationName, actual.getAttribute(AttributeKey.stringKey("messaging.operation"))); + if (messages.size() > 1) { + assertEquals(messages.size(), actual.getAttribute(AttributeKey.longKey("messaging.batch.message_count"))); + } for (int i = 0; i < links.size(); i++) { String messageTraceparent = (String) messages.get(i).getApplicationProperties().get("traceparent"); SpanContext linkContext = links.get(i).getSpanContext(); String linkTraceparent = "00-" + linkContext.getTraceId() + "-" + linkContext.getSpanId() + "-01"; assertEquals(messageTraceparent, linkTraceparent); - // TODO (lmolkova) uncomment after otel 1.0.0-beta.29 ships - // assertNotNull(links.get(i).getAttributes().get(AttributeKey.longKey(Tracer.MESSAGE_ENQUEUED_TIME))); + assertNotNull(links.get(i).getAttributes().get(AttributeKey.longKey(ServiceBusTracer.MESSAGE_ENQUEUED_TIME_ATTRIBUTE_NAME))); } } @@ -677,7 +707,7 @@ private void assertConsumerSpan(ReadableSpan actual, ServiceBusReceivedMessage m assertEquals(spanName, actual.getName()); assertEquals(SpanKind.CONSUMER, actual.getKind()); assertEquals(0, actual.toSpanData().getLinks().size()); - + assertEquals("process", actual.getAttribute(AttributeKey.stringKey("messaging.operation"))); String messageTraceparent = (String) message.getApplicationProperties().get("traceparent"); String parent = "00-" + actual.getSpanContext().getTraceId() + "-" + actual.getParentSpanContext().getSpanId() + "-01"; assertEquals(messageTraceparent, parent); @@ -744,14 +774,15 @@ public boolean isStartRequired() { @Override public void onEnd(ReadableSpan readableSpan) { assertEquals("Microsoft.ServiceBus", readableSpan.getAttribute(AttributeKey.stringKey("az.namespace"))); - assertEquals(entityName, readableSpan.getAttribute(AttributeKey.stringKey("message_bus.destination"))); - assertEquals(namespace, readableSpan.getAttribute(AttributeKey.stringKey("peer.address"))); + assertEquals("servicebus", readableSpan.getAttribute(AttributeKey.stringKey("messaging.system"))); + assertEquals(entityName, readableSpan.getAttribute(AttributeKey.stringKey("messaging.destination.name"))); + assertEquals(namespace, readableSpan.getAttribute(AttributeKey.stringKey("net.peer.name"))); + spans.add(readableSpan); Consumer filter = notifier.get(); if (filter != null) { filter.accept(readableSpan); } - spans.add(readableSpan); } public void notifyIfCondition(CountDownLatch countDownLatch, Predicate filter) { diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/administration/ServiceBusAdministrationAsyncClientIntegrationTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/administration/ServiceBusAdministrationAsyncClientIntegrationTest.java index cd0e8899f98fc..df10cfb78ad5a 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/administration/ServiceBusAdministrationAsyncClientIntegrationTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/administration/ServiceBusAdministrationAsyncClientIntegrationTest.java @@ -476,7 +476,9 @@ void deleteQueue(HttpClient httpClient) { final ServiceBusAdministrationAsyncClient client = createClient(httpClient); final String queueName = testResourceNamer.randomName("sub", 10); - client.createQueue(queueName).block(TIMEOUT); + client.createQueue(queueName) + .onErrorResume(ResourceExistsException.class, e -> Mono.empty()) + .block(TIMEOUT); // Act & Assert StepVerifier.create(client.deleteQueue(queueName)) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/administration/ServiceBusAdministrationClientIntegrationTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/administration/ServiceBusAdministrationClientIntegrationTest.java index c731b5b478f5a..857090bad73cf 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/administration/ServiceBusAdministrationClientIntegrationTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/administration/ServiceBusAdministrationClientIntegrationTest.java @@ -12,8 +12,6 @@ import com.azure.core.http.rest.PagedIterable; import com.azure.core.http.rest.Response; import com.azure.core.test.TestBase; -import com.azure.core.test.TestMode; -import com.azure.core.test.implementation.TestingHelpers; import com.azure.messaging.servicebus.TestUtils; import com.azure.messaging.servicebus.administration.implementation.models.ServiceBusManagementErrorException; import com.azure.messaging.servicebus.administration.models.AccessRights; @@ -37,7 +35,6 @@ import com.azure.messaging.servicebus.administration.models.TopicProperties; import com.azure.messaging.servicebus.administration.models.TopicRuntimeProperties; import com.azure.messaging.servicebus.administration.models.TrueRuleFilter; -import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; @@ -46,13 +43,23 @@ import java.time.OffsetDateTime; import java.util.Collections; import java.util.List; -import java.util.Locale; import java.util.Optional; import java.util.regex.Matcher; import java.util.regex.Pattern; -import static com.azure.messaging.servicebus.TestUtils.*; -import static org.junit.jupiter.api.Assertions.*; +import static com.azure.messaging.servicebus.TestUtils.assertAuthorizationRules; +import static com.azure.messaging.servicebus.TestUtils.getConnectionString; +import static com.azure.messaging.servicebus.TestUtils.getEntityName; +import static com.azure.messaging.servicebus.TestUtils.getQueueBaseName; +import static com.azure.messaging.servicebus.TestUtils.getRuleBaseName; +import static com.azure.messaging.servicebus.TestUtils.getSubscriptionBaseName; +import static com.azure.messaging.servicebus.TestUtils.getTopicBaseName; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assumptions.assumeTrue; /** @@ -62,44 +69,6 @@ public class ServiceBusAdministrationClientIntegrationTest extends TestBase { protected static final Duration TIMEOUT = Duration.ofSeconds(20); - @AfterAll - static void cleanup() { - - if (TestingHelpers.getTestMode() == TestMode.PLAYBACK) { - return; - } - final ServiceBusAdministrationClient client = new ServiceBusAdministrationClientBuilder() - .connectionString(getConnectionString(false)) - .buildClient(); - // Clear all queues - client.listQueues().stream() - .filter(queueProperties -> !queueProperties.getName().toLowerCase(Locale.ROOT) - .equals(getEntityName(getQueueBaseName(), 5))) - .forEach(property -> client.deleteQueue(property.getName())); - - //Clear all topics - client.listTopics().stream() - .filter(properties -> !(properties.getName().toLowerCase(Locale.ROOT) - .equals(getEntityName(getTopicBaseName(), 2)) - || properties.getName().toLowerCase(Locale.ROOT) - .equals(getEntityName(getTopicBaseName(), 1)))) - .forEach(property -> client.deleteTopic(property.getName())); - - //Clear all subscriptions - final String topicName = getEntityName(getTopicBaseName(), 2); - client.listSubscriptions(topicName).stream() - .filter(properties -> !properties.getSubscriptionName().toLowerCase(Locale.ROOT) - .equals(getEntityName(getSubscriptionBaseName(), 2))) - .forEach(property -> client.deleteSubscription(topicName, property.getSubscriptionName())); - - //Clear rules in subscription - final String subscriptionName = getEntityName(getSubscriptionBaseName(), 2); - client.listRules(topicName, subscriptionName).stream() - .filter(properties -> !properties.getName().toLowerCase(Locale.ROOT) - .equals(getEntityName(getRuleBaseName(), 2))) - .forEach(property -> client.deleteRule(topicName, subscriptionName, property.getName())); - } - /** * Test to connect to the service bus with an azure sas credential. * ServiceBusSharedKeyCredential doesn't need a specific test method because other tests below @@ -118,7 +87,7 @@ void azureSasCredentialsTest() { Matcher matcher = sasPattern.matcher(connectionString); assertTrue(matcher.find(), "Couldn't find SAS from connection string"); ServiceBusAdministrationClient client = new ServiceBusAdministrationClientBuilder() - .endpoint(fullyQualifiedDomainName) + .endpoint("https://" + fullyQualifiedDomainName) .credential(new AzureSasCredential(matcher.group(1))) .buildClient(); NamespaceProperties np = client.getNamespaceProperties(); @@ -130,7 +99,7 @@ void createQueue() { final ServiceBusAdministrationClient client = getClient(); final String queueName = interceptorManager.isPlaybackMode() ? "queue-2" - : getEntityName(getQueueBaseName(), 2); + : testResourceNamer.randomName("queue", 10); final String forwardToEntityName = interceptorManager.isPlaybackMode() ? "queue-5" : getEntityName(getQueueBaseName(), 5); @@ -175,9 +144,6 @@ void createQueue() { assertEquals(0, runtimeProperties.getTotalMessageCount()); assertEquals(0, runtimeProperties.getSizeInBytes()); assertNotNull(runtimeProperties.getCreatedAt()); - - //cleanup - //client.deleteQueue(queueName); } @Test @@ -185,7 +151,7 @@ void createTopicWithResponse() { final ServiceBusAdministrationClient client = getClient(); final String topicName = interceptorManager.isPlaybackMode() ? "topic-3" - : getEntityName(getTopicBaseName(), 3); + : testResourceNamer.randomName("test", 10); final CreateTopicOptions expected = new CreateTopicOptions() .setMaxSizeInMegabytes(2048L) .setDuplicateDetectionRequired(true) @@ -207,8 +173,6 @@ void createTopicWithResponse() { assertEquals(0, runtimeProperties.getSubscriptionCount()); assertEquals(0, runtimeProperties.getSizeInBytes()); assertNotNull(runtimeProperties.getCreatedAt()); - - //client.deleteTopic(topicName); } @Test @@ -252,7 +216,7 @@ void createRule() { : getEntityName(getTopicBaseName(), 2); final String subscriptionName = interceptorManager.isPlaybackMode() ? "subscription-2" - : getEntityName(getSubscriptionBaseName(), 2); + : getSubscriptionBaseName(); final SqlRuleAction action = new SqlRuleAction("SET Label = 'test'"); final CreateRuleOptions options = new CreateRuleOptions() .setAction(action) @@ -280,7 +244,7 @@ void createRuleDefaults() { : getEntityName(getTopicBaseName(), 2); final String subscriptionName = interceptorManager.isPlaybackMode() ? "subscription-2" - : getEntityName(getSubscriptionBaseName(), 2); + : getSubscriptionBaseName(); final RuleProperties rule = client.createRule(topicName, subscriptionName, ruleName); assertEquals(ruleName, rule.getName()); @@ -298,7 +262,7 @@ void createRuleResponse() { : getEntityName(getTopicBaseName(), 2); final String subscriptionName = interceptorManager.isPlaybackMode() ? "subscription-2" - : getEntityName(getSubscriptionBaseName(), 2); + : getSubscriptionBaseName(); final SqlRuleFilter filter = new SqlRuleFilter("sys.To='foo' OR sys.MessageId IS NULL"); final CreateRuleOptions options = new CreateRuleOptions() @@ -344,7 +308,7 @@ void createSubscriptionExistingName() { : getEntityName(getTopicBaseName(), 2); final String subscriptionName = interceptorManager.isPlaybackMode() ? "subscription-2" - : getEntityName(getSubscriptionBaseName(), 2); + : getSubscriptionBaseName(); final ServiceBusAdministrationClient client = getClient(); ServiceBusManagementErrorException exception = assertThrows(ServiceBusManagementErrorException.class, @@ -363,7 +327,7 @@ void updateRuleResponse() { : getEntityName(getTopicBaseName(), 2); final String subscriptionName = interceptorManager.isPlaybackMode() ? "subscription-2" - : getEntityName(getSubscriptionBaseName(), 2); + : getSubscriptionBaseName(); final SqlRuleAction expectedAction = new SqlRuleAction("SET MessageId = 'matching-id'"); final SqlRuleFilter expectedFilter = new SqlRuleFilter("sys.To = 'telemetry-event'"); @@ -497,7 +461,7 @@ void getTopicDoesNotExist() { final ServiceBusAdministrationClient client = getClient(); final String topicName = interceptorManager.isPlaybackMode() ? "topic-99" - : getEntityName(getTopicBaseName(), 99); + : testResourceNamer.randomName("topic", 10); assertThrows(ResourceNotFoundException.class, () -> client.getTopic(topicName), "Topic exists! But should not. Incorrect getTopic behavior"); @@ -518,7 +482,7 @@ void getTopicExistsFalse() { final ServiceBusAdministrationClient client = getClient(); final String topicName = interceptorManager.isPlaybackMode() ? "topic-99" - : getEntityName(getTopicBaseName(), 99); + : testResourceNamer.randomName(getTopicBaseName(), 10); assertFalse(client.getTopicExists(topicName)); } @@ -551,7 +515,7 @@ void getSubscription() { : getEntityName(getTopicBaseName(), 2); final String subscriptionName = interceptorManager.isPlaybackMode() ? "subscription-2" - : getEntityName(getSubscriptionBaseName(), 2); + : getSubscriptionBaseName(); final OffsetDateTime nowUtc = OffsetDateTime.now(Clock.systemUTC()); final SubscriptionProperties properties = client.getSubscription(topicName, subscriptionName); @@ -575,7 +539,7 @@ void getSubscriptionDoesNotExist() { : getEntityName(getTopicBaseName(), 2); final String subscriptionName = interceptorManager.isPlaybackMode() ? "subscription-99" - : getEntityName(getSubscriptionBaseName(), 99); + : testResourceNamer.randomName(getSubscriptionBaseName(), 10); ServiceBusManagementErrorException exception = assertThrows(ServiceBusManagementErrorException.class, () -> client.getSubscription(topicName, subscriptionName), @@ -591,7 +555,7 @@ void getSubscriptionExists() { : getEntityName(getTopicBaseName(), 2); final String subscriptionName = interceptorManager.isPlaybackMode() ? "subscription-2" - : getEntityName(getSubscriptionBaseName(), 2); + : getSubscriptionBaseName(); assertTrue(client.getSubscriptionExists(topicName, subscriptionName)); } @@ -604,7 +568,7 @@ void getSubscriptionRuntimeProperties() { : getEntityName(getTopicBaseName(), 2); final String subscriptionName = interceptorManager.isPlaybackMode() ? "subscription-2" - : getEntityName(getSubscriptionBaseName(), 2); + : getSubscriptionBaseName(); final OffsetDateTime nowUtc = OffsetDateTime.now(Clock.systemUTC()); final SubscriptionRuntimeProperties properties = client.getSubscriptionRuntimeProperties(topicName, subscriptionName); @@ -648,7 +612,7 @@ void getSubscriptionRuntimePropertiesUnauthorizedClient() { : getEntityName(getTopicBaseName(), 2); final String subscriptionName = interceptorManager.isPlaybackMode() ? "subscription-2" - : getEntityName(getSubscriptionBaseName(), 2); + : getSubscriptionBaseName(); ServiceBusManagementErrorException exception = assertThrows(ServiceBusManagementErrorException.class, () -> client.getSubscriptionRuntimeProperties(topicName, subscriptionName), @@ -662,13 +626,13 @@ void getRule() { final String ruleName = interceptorManager.isPlaybackMode() ? "rule-2" - : getEntityName(getRuleBaseName(), 2); + : "$Default"; final String topicName = interceptorManager.isPlaybackMode() ? "topic-2" : getEntityName(getTopicBaseName(), 2); final String subscriptionName = interceptorManager.isPlaybackMode() ? "subscription-2" - : getEntityName(getSubscriptionBaseName(), 2); + : getSubscriptionBaseName(); final Response response = client.getRuleWithResponse(topicName, subscriptionName, ruleName, null); assertEquals(200, response.getStatusCode()); @@ -692,8 +656,6 @@ void deleteQueue() { ? "queue-9" : getEntityName(getQueueBaseName(), 9); - client.createQueue(queueName); - client.deleteQueue(queueName); } @@ -708,7 +670,7 @@ void deleteRule() { : getEntityName(getTopicBaseName(), 2); final String subscriptionName = interceptorManager.isPlaybackMode() ? "subscription-2" - : getEntityName(getSubscriptionBaseName(), 2); + : getSubscriptionBaseName(); client.createRule(topicName, subscriptionName, ruleName); client.deleteRule(topicName, subscriptionName, ruleName); @@ -722,7 +684,7 @@ void deleteSubscription() { : getEntityName(getTopicBaseName(), 2); final String subscriptionName = interceptorManager.isPlaybackMode() ? "subscription-9" - : getEntityName(getSubscriptionBaseName(), 9); + : testResourceNamer.randomName(getSubscriptionBaseName(), 10); client.createSubscription(topicName, subscriptionName); @@ -735,7 +697,6 @@ void deleteTopic() { final String topicName = interceptorManager.isPlaybackMode() ? "topic-9" : getEntityName(getTopicBaseName(), 9); - client.createTopic(topicName); client.deleteTopic(topicName); } @@ -789,13 +750,13 @@ void listRules() { final String ruleName = interceptorManager.isPlaybackMode() ? "rule-2" - : getEntityName(getRuleBaseName(), 2); + : "$Default"; final String topicName = interceptorManager.isPlaybackMode() ? "topic-2" : getEntityName(getTopicBaseName(), 2); final String subscriptionName = interceptorManager.isPlaybackMode() ? "subscription-2" - : getEntityName(getSubscriptionBaseName(), 2); + : getSubscriptionBaseName(); PagedIterable ruleProperties = client.listRules(topicName, subscriptionName);