Skip to content

Commit

Permalink
[ServiceBus] migrate to @azure/core-tracing v1.0.0 (#22517)
Browse files Browse the repository at this point in the history
* Move to `core-tracing` GA version ^1.0.0

* Update tracing support

* React to tracing changes in v1.0.0

* Remove test that no longer needed as the method was removed

* Fix util unit tests

* Remove createServiceBusSpan, createProcessingSpan, and createAndEndProcessingSpan

* Update changelog

* Remove deprecated `TryAddOptions.parentSpan` property

* Bump minor version
  • Loading branch information
jeremymeng authored Jul 12, 2022
1 parent e11e57b commit 8bbc58b
Show file tree
Hide file tree
Showing 19 changed files with 1,063 additions and 2,223 deletions.
4 changes: 3 additions & 1 deletion sdk/servicebus/service-bus/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Release History

## 7.6.1 (Unreleased)
## 7.7.0 (Unreleased)

### Features Added

Expand All @@ -10,6 +10,8 @@

### Other Changes

- Updated our `@azure/core-tracing` dependency to the latest version (v1.0.0)

## 7.6.0 (2022-07-07)

### Other Changes
Expand Down
4 changes: 2 additions & 2 deletions sdk/servicebus/service-bus/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"name": "@azure/service-bus",
"sdk-type": "client",
"author": "Microsoft Corporation",
"version": "7.6.1",
"version": "7.7.0",
"license": "MIT",
"description": "Azure Service Bus SDK for JavaScript",
"homepage": "https://github.com/Azure/azure-sdk-for-js/tree/main/sdk/servicebus/service-bus/",
Expand Down Expand Up @@ -110,7 +110,7 @@
"@azure/core-auth": "^1.3.0",
"@azure/core-client": "^1.0.0",
"@azure/core-rest-pipeline": "^1.1.0",
"@azure/core-tracing": "1.0.0-preview.13",
"@azure/core-tracing": "^1.0.0",
"@azure/core-paging": "^1.1.1",
"@azure/core-util": "^1.0.0",
"@azure/core-xml": "^1.0.0",
Expand Down
7 changes: 2 additions & 5 deletions sdk/servicebus/service-bus/review/service-bus.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,9 @@ import { RetryMode } from '@azure/core-amqp';
import { RetryOptions } from '@azure/core-amqp';
import { SASCredential } from '@azure/core-auth';
import { ServiceClient } from '@azure/core-client';
import { Span } from '@azure/core-tracing';
import { SpanContext } from '@azure/core-tracing';
import { TokenCredential } from '@azure/core-auth';
import { TokenType } from '@azure/core-amqp';
import { TracingContext } from '@azure/core-tracing';
import { UserAgentPolicyOptions } from '@azure/core-rest-pipeline';
import { WebSocketImpl } from 'rhea-promise';
import { WebSocketOptions } from '@azure/core-amqp';
Expand Down Expand Up @@ -448,7 +447,7 @@ export interface ServiceBusMessageBatch {
_generateMessage(): Buffer;
readonly maxSizeInBytes: number;
// @internal
readonly _messageSpanContexts: SpanContext[];
readonly _messageSpanContexts: TracingContext[];
readonly sizeInBytes: number;
tryAddMessage(message: ServiceBusMessage | AmqpAnnotatedMessage, options?: TryAddOptions): boolean;
}
Expand Down Expand Up @@ -637,8 +636,6 @@ export type TransferProgressEvent = {

// @public
export interface TryAddOptions {
// @deprecated (undocumented)
parentSpan?: Span | SpanContext | null;
tracingOptions?: OperationTracingOptions;
}

Expand Down
18 changes: 8 additions & 10 deletions sdk/servicebus/service-bus/src/core/batchingReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ import { throwErrorIfConnectionClosed } from "../util/errors";
import { AbortSignalLike } from "@azure/abort-controller";
import { checkAndRegisterWithAbortSignal } from "../util/utils";
import { OperationOptionsBase } from "../modelsToBeSharedWithEventHubs";
import { createAndEndProcessingSpan } from "../diagnostics/instrumentServiceBusMessage";
import { toProcessingSpanOptions } from "../diagnostics/instrumentServiceBusMessage";
import { ReceiveMode } from "../models";
import { ServiceBusError, translateServiceBusError } from "../serviceBusError";
import { tracingClient } from "../diagnostics/tracing";

/**
* Describes the batching receiver where the user can receive a specified number of messages for
Expand Down Expand Up @@ -238,11 +239,6 @@ interface ReceiveMessageArgs extends OperationOptionsBase {
* @internal
*/
export class BatchingReceiverLite {
/**
* NOTE: exists only to make unit testing possible.
*/
private _createAndEndProcessingSpan: typeof createAndEndProcessingSpan;

constructor(
private _connectionContext: ConnectionContext,
public entityPath: string,
Expand All @@ -252,8 +248,6 @@ export class BatchingReceiverLite {
private _receiveMode: ReceiveMode,
_skipParsingBodyAsJson: boolean
) {
this._createAndEndProcessingSpan = createAndEndProcessingSpan;

this._createServiceBusMessage = (context: MessageAndDelivery) => {
return new ServiceBusMessageImpl(
context.message!,
Expand Down Expand Up @@ -301,8 +295,12 @@ export class BatchingReceiverLite {
const messages = await new Promise<ServiceBusMessageImpl[]>((resolve, reject) =>
this._receiveMessagesImpl(receiver, args, resolve, reject)
);
this._createAndEndProcessingSpan(messages, this, this._connectionContext.config, args);
return messages;
return tracingClient.withSpan(
"BatchingReceiverLite.process",
args,
() => messages,
toProcessingSpanOptions(messages, this, this._connectionContext.config)
);
} finally {
this._closeHandler = undefined;
this.isReceivingMessages = false;
Expand Down
13 changes: 9 additions & 4 deletions sdk/servicebus/service-bus/src/core/streamingReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import {
RetryOptions,
ConditionErrorNameMapper,
} from "@azure/core-amqp";
import { OperationOptionsBase, trace } from "../modelsToBeSharedWithEventHubs";
import { OperationOptionsBase } from "../modelsToBeSharedWithEventHubs";
import { receiverLogger as logger } from "../log";
import { AmqpError, EventContext, OnAmqpEvent } from "rhea-promise";
import { ServiceBusMessageImpl } from "../serviceBusMessage";
Expand All @@ -27,8 +27,9 @@ import {
ProcessErrorArgs,
SubscribeOptions,
} from "../models";
import { createProcessingSpan } from "../diagnostics/instrumentServiceBusMessage";
import { toProcessingSpanOptions } from "../diagnostics/instrumentServiceBusMessage";
import { AbortError } from "@azure/abort-controller";
import { tracingClient } from "../diagnostics/tracing";

/**
* @internal
Expand Down Expand Up @@ -484,8 +485,12 @@ export class StreamingReceiver extends MessageReceiver {
},
processMessage: async (message: ServiceBusMessageImpl) => {
try {
const span = createProcessingSpan(message, this, this._context.config, operationOptions);
return await trace(() => userHandlers.processMessage(message), span);
await tracingClient.withSpan(
"StreamReceiver.process",
operationOptions ?? {},
() => userHandlers.processMessage(message),
toProcessingSpanOptions(message, this, this._context.config)
);
} catch (err: any) {
this._messageHandlers().processError({
error: err,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,39 +1,114 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

import {
extractSpanContextFromTraceParentHeader,
SpanStatusCode,
Link,
Span,
SpanContext,
SpanKind,
} from "@azure/core-tracing";
import { TracingContext, TracingSpanLink, TracingSpanOptions } from "@azure/core-tracing";
import { ConnectionContext } from "../connectionContext";
import { OperationOptionsBase } from "../modelsToBeSharedWithEventHubs";
import { ServiceBusReceiver } from "../receivers/receiver";
import { ServiceBusMessage, ServiceBusReceivedMessage } from "../serviceBusMessage";
import { createServiceBusSpan } from "./tracing";
import { toSpanOptions, tracingClient } from "./tracing";

/**
* @hidden
* @internal
*/
export const TRACEPARENT_PROPERTY = "Diagnostic-Id";

/**
* @hidden
*/
export interface InstrumentableMessage {
/**
* The application specific properties which can be
* used for custom message metadata.
*/
applicationProperties?: { [key: string]: number | boolean | string | Date | null };
}

/**
* Instruments an AMQP message with a proper `Diagnostic-Id` for tracing.
*
* @hidden
*/
export function instrumentMessage<T extends InstrumentableMessage>(
message: T,
options: OperationOptionsBase,
entityPath: string,
host: string
): {
/**
* If instrumentation was done, a copy of the message with
* message.applicationProperties['Diagnostic-Id'] filled
* out appropriately.
*/
message: T;

/**
* A valid SpanContext if this message should be linked to a parent span, or undefined otherwise.
*/
spanContext: TracingContext | undefined;
} {
// check if the event has already been instrumented
const previouslyInstrumented = Boolean(message.applicationProperties?.[TRACEPARENT_PROPERTY]);

if (previouslyInstrumented) {
return {
message,
spanContext: undefined,
};
}

const { span: messageSpan, updatedOptions } = tracingClient.startSpan(
"message",
options,
toSpanOptions({ entityPath, host }, "producer")
);

try {
if (!messageSpan.isRecording()) {
return {
message,
spanContext: undefined,
};
}

const traceParent = tracingClient.createRequestHeaders(
updatedOptions.tracingOptions?.tracingContext
)["traceparent"];

if (traceParent) {
// create a copy so the original isn't modified
message = {
...message,
applicationProperties: {
...message.applicationProperties,
[TRACEPARENT_PROPERTY]: traceParent,
},
};
}

return {
message,
spanContext: updatedOptions.tracingOptions?.tracingContext,
};
} finally {
messageSpan.end();
}
}

/**
* Extracts the `SpanContext` from an `ServiceBusMessage` if the context exists.
* @param message - An individual `ServiceBusMessage` object.
* @internal
*/
export function extractSpanContextFromServiceBusMessage(
message: ServiceBusMessage
): SpanContext | undefined {
): TracingContext | undefined {
if (!message.applicationProperties || !message.applicationProperties[TRACEPARENT_PROPERTY]) {
return;
}

const diagnosticId = message.applicationProperties[TRACEPARENT_PROPERTY] as string;
return extractSpanContextFromTraceParentHeader(diagnosticId);
return tracingClient.parseTraceparentHeader(diagnosticId);
}

/**
Expand All @@ -56,70 +131,28 @@ function* getReceivedMessages(
}

/**
* A span that encompasses the period when the message has been received and
* is being processed.
*
* NOTE: The amount of time the user would be considered processing the message is
* not always clear - in that case the span will have a very short lifetime
* since we'll start the span when we receive the message and end it when we
* give the message to the user.
*
* @internal
*/
export function createProcessingSpan(
export function toProcessingSpanOptions(
receivedMessages: ServiceBusReceivedMessage | ServiceBusReceivedMessage[],
// NOTE: the connectionConfig also has an entityPath property but that only
// represents the optional entityPath in their connection string which is NOT
// what we want for tracing.
receiver: Pick<ServiceBusReceiver, "entityPath">,
connectionConfig: Pick<ConnectionContext["config"], "host">,
options?: OperationOptionsBase
): Span {
const links: Link[] = [];

connectionConfig: Pick<ConnectionContext["config"], "host">
): TracingSpanOptions {
const spanLinks: TracingSpanLink[] = [];
for (const receivedMessage of getReceivedMessages(receivedMessages)) {
const spanContext = extractSpanContextFromServiceBusMessage(receivedMessage);

if (spanContext == null) {
continue;
const tracingContext = extractSpanContextFromServiceBusMessage(receivedMessage);
if (tracingContext) {
spanLinks.push({
tracingContext,
attributes: {
enqueuedTime: receivedMessage.enqueuedTimeUtc?.getTime(),
},
});
}

links.push({
context: spanContext,
attributes: {
enqueuedTime: receivedMessage.enqueuedTimeUtc?.getTime(),
},
});
}

const { span } = createServiceBusSpan(
"process",
options,
receiver.entityPath,
connectionConfig.host,
{
kind: SpanKind.CONSUMER,
links,
}
);

return span;
}

/**
* Creates and immediately ends a processing span. Used when
* the 'processing' occurs outside of our control so we don't
* know the scope.
*
* @internal
*/
export function createAndEndProcessingSpan(
receivedMessages: ServiceBusReceivedMessage | ServiceBusReceivedMessage[],
receiver: Pick<ServiceBusReceiver, "entityPath">,
connectionConfig: Pick<ConnectionContext["config"], "host">,
options?: OperationOptionsBase
): void {
const span = createProcessingSpan(receivedMessages, receiver, connectionConfig, options);
span.setStatus({ code: SpanStatusCode.OK });
span.end();
return {
spanLinks,
spanKind: "consumer",
...toSpanOptions({ host: connectionConfig.host, entityPath: receiver.entityPath }),
};
}
Loading

0 comments on commit 8bbc58b

Please sign in to comment.